@@ -9,6 +9,9 @@ use linux_perf_event_reader::{
99use std:: collections:: { HashMap , VecDeque } ;
1010use std:: io:: { Cursor , Read , Seek , SeekFrom } ;
1111
12+ #[ cfg( feature = "zstd" ) ]
13+ use crate :: decompression:: ZstdDecompressor ;
14+
1215use super :: error:: { Error , ReadError } ;
1316use super :: feature_sections:: AttributeDescription ;
1417use super :: features:: Feature ;
@@ -204,6 +207,8 @@ impl<C: Read + Seek> PerfFileReader<C> {
204207 buffers_for_recycling : VecDeque :: new ( ) ,
205208 current_event_body : Vec :: new ( ) ,
206209 pending_first_record : None ,
210+ #[ cfg( feature = "zstd" ) ]
211+ zstd_decompressor : ZstdDecompressor :: new ( ) ,
207212 } ;
208213
209214 Ok ( Self {
@@ -374,6 +379,8 @@ impl<R: Read> PerfFileReader<R> {
374379 buffers_for_recycling : VecDeque :: new ( ) ,
375380 current_event_body : Vec :: new ( ) ,
376381 pending_first_record,
382+ #[ cfg( feature = "zstd" ) ]
383+ zstd_decompressor : ZstdDecompressor :: new ( ) ,
377384 } ;
378385
379386 Ok ( Self {
@@ -399,6 +406,9 @@ pub struct PerfRecordIter<R: Read> {
399406 buffers_for_recycling : VecDeque < Vec < u8 > > ,
400407 /// For pipe mode: the first non-metadata record that was read during initialization
401408 pending_first_record : Option < ( PerfEventHeader , Vec < u8 > ) > ,
409+ /// Zstd decompressor for handling COMPRESSED records
410+ #[ cfg( feature = "zstd" ) ]
411+ zstd_decompressor : ZstdDecompressor ,
402412}
403413
404414impl < R : Read > PerfRecordIter < R > {
@@ -467,9 +477,9 @@ impl<R: Read> PerfRecordIter<R> {
467477 }
468478 self . read_offset += u64:: from ( header. size ) ;
469479
470- if UserRecordType :: try_from ( RecordType ( header. type_ ) )
471- == Some ( UserRecordType :: PERF_FINISHED_ROUND )
472- {
480+ let user_record_type = UserRecordType :: try_from ( RecordType ( header. type_ ) ) ;
481+
482+ if user_record_type == Some ( UserRecordType :: PERF_FINISHED_ROUND ) {
473483 self . sorter . finish_round ( ) ;
474484 if self . sorter . has_more ( ) {
475485 // The sorter is non-empty. We're done.
@@ -484,7 +494,6 @@ impl<R: Read> PerfRecordIter<R> {
484494 let event_body_len = size - PerfEventHeader :: STRUCT_SIZE ;
485495 let mut buffer = self . buffers_for_recycling . pop_front ( ) . unwrap_or_default ( ) ;
486496 buffer. resize ( event_body_len, 0 ) ;
487-
488497 // Try to read the event body. For pipe mode, EOF here also means end of stream.
489498 match self . reader . read_exact ( & mut buffer) {
490499 Ok ( ( ) ) => { }
@@ -499,6 +508,28 @@ impl<R: Read> PerfRecordIter<R> {
499508 }
500509 }
501510
511+ if user_record_type == Some ( UserRecordType :: PERF_COMPRESSED ) {
512+ // PERF_COMPRESSED is the old format, not yet implemented
513+ return Err ( Error :: IoError ( std:: io:: Error :: new (
514+ std:: io:: ErrorKind :: Unsupported ,
515+ "PERF_COMPRESSED (type 81) is not supported yet, only PERF_COMPRESSED2 (type 83)" ,
516+ ) ) ) ;
517+ }
518+
519+ if user_record_type == Some ( UserRecordType :: PERF_COMPRESSED2 ) {
520+ #[ cfg( not( feature = "zstd" ) ) ]
521+ {
522+ return Err ( Error :: IoError ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Unsupported ,
523+ "Compression support is not enabled. Please rebuild with the 'zstd' feature flag." ,
524+ ) ) ) ;
525+ }
526+ #[ cfg( feature = "zstd" ) ]
527+ {
528+ self . decompress_and_process_compressed2 :: < T > ( & buffer) ?;
529+ continue ;
530+ }
531+ }
532+
502533 self . process_record :: < T > ( header, buffer, offset) ?;
503534 }
504535
@@ -550,7 +581,64 @@ impl<R: Read> PerfRecordIter<R> {
550581 attr_index,
551582 } ;
552583 self . sorter . insert_unordered ( sort_key, pending_record) ;
584+ Ok ( ( ) )
585+ }
553586
587+ /// Decompresses a PERF_RECORD_COMPRESSED2 record and processes its sub-records.
588+ #[ cfg( feature = "zstd" ) ]
589+ fn decompress_and_process_compressed2 < T : ByteOrder > (
590+ & mut self ,
591+ buffer : & [ u8 ] ,
592+ ) -> Result < ( ) , Error > {
593+ if buffer. len ( ) < 8 {
594+ return Err ( ReadError :: PerfEventData . into ( ) ) ;
595+ }
596+ let data_size = T :: read_u64 ( & buffer[ 0 ..8 ] ) as usize ;
597+ if data_size > buffer. len ( ) - 8 {
598+ return Err ( ReadError :: PerfEventData . into ( ) ) ;
599+ }
600+ let compressed_data = & buffer[ 8 ..8 + data_size] ;
601+
602+ let decompressed = self . zstd_decompressor . decompress ( compressed_data) ?;
603+
604+ // Parse the decompressed data as a sequence of perf records
605+ let mut cursor = Cursor :: new ( & decompressed[ ..] ) ;
606+ let mut offset = 0u64 ;
607+
608+ while ( cursor. position ( ) as usize ) < decompressed. len ( ) {
609+ let header_start = cursor. position ( ) as usize ;
610+ // Check if we have enough bytes for a header
611+ let remaining = decompressed. len ( ) - header_start;
612+ if remaining < PerfEventHeader :: STRUCT_SIZE {
613+ self . zstd_decompressor
614+ . save_partial_record ( & decompressed[ header_start..] ) ;
615+ break ;
616+ }
617+
618+ let sub_header = PerfEventHeader :: parse :: < _ , T > ( & mut cursor) ?;
619+ let sub_size = sub_header. size as usize ;
620+ if sub_size < PerfEventHeader :: STRUCT_SIZE {
621+ return Err ( Error :: InvalidPerfEventSize ) ;
622+ }
623+
624+ let sub_event_body_len = sub_size - PerfEventHeader :: STRUCT_SIZE ;
625+ // Check if we have enough bytes for the sub-record body
626+ let remaining_after_header = decompressed. len ( ) - cursor. position ( ) as usize ;
627+ if sub_event_body_len > remaining_after_header {
628+ self . zstd_decompressor
629+ . save_partial_record ( & decompressed[ header_start..] ) ;
630+ break ;
631+ }
632+
633+ let mut sub_buffer = self . buffers_for_recycling . pop_front ( ) . unwrap_or_default ( ) ;
634+ sub_buffer. resize ( sub_event_body_len, 0 ) ;
635+ cursor
636+ . read_exact ( & mut sub_buffer)
637+ . map_err ( |_| ReadError :: PerfEventData ) ?;
638+
639+ self . process_record :: < T > ( sub_header, sub_buffer, offset) ?;
640+ offset += sub_size as u64 ;
641+ }
554642 Ok ( ( ) )
555643 }
556644
0 commit comments