@@ -9,6 +9,9 @@ use linux_perf_event_reader::{
99use std:: collections:: { HashMap , VecDeque } ;
1010use std:: io:: { Cursor , Read , Seek , SeekFrom } ;
1111
12+ #[ cfg( feature = "zstd" ) ]
13+ use crate :: decompression:: ZstdDecompressor ;
14+
1215use super :: error:: { Error , ReadError } ;
1316use super :: feature_sections:: AttributeDescription ;
1417use super :: features:: Feature ;
@@ -196,6 +199,8 @@ impl<C: Read + Seek> PerfFileReader<C> {
196199 buffers_for_recycling : VecDeque :: new ( ) ,
197200 current_event_body : Vec :: new ( ) ,
198201 pending_first_record : None ,
202+ #[ cfg( feature = "zstd" ) ]
203+ zstd_decompressor : ZstdDecompressor :: new ( ) ,
199204 } ;
200205
201206 Ok ( Self {
@@ -366,6 +371,8 @@ impl<R: Read> PerfFileReader<R> {
366371 buffers_for_recycling : VecDeque :: new ( ) ,
367372 current_event_body : Vec :: new ( ) ,
368373 pending_first_record,
374+ #[ cfg( feature = "zstd" ) ]
375+ zstd_decompressor : ZstdDecompressor :: new ( ) ,
369376 } ;
370377
371378 Ok ( Self {
@@ -391,6 +398,9 @@ pub struct PerfRecordIter<R: Read> {
391398 buffers_for_recycling : VecDeque < Vec < u8 > > ,
392399 /// For pipe mode: the first non-metadata record that was read during initialization
393400 pending_first_record : Option < ( PerfEventHeader , Vec < u8 > ) > ,
401+ /// Zstd decompressor for handling COMPRESSED records
402+ #[ cfg( feature = "zstd" ) ]
403+ zstd_decompressor : ZstdDecompressor ,
394404}
395405
396406impl < R : Read > PerfRecordIter < R > {
@@ -459,9 +469,9 @@ impl<R: Read> PerfRecordIter<R> {
459469 }
460470 self . read_offset += u64:: from ( header. size ) ;
461471
462- if UserRecordType :: try_from ( RecordType ( header. type_ ) )
463- == Some ( UserRecordType :: PERF_FINISHED_ROUND )
464- {
472+ let user_record_type = UserRecordType :: try_from ( RecordType ( header. type_ ) ) ;
473+
474+ if user_record_type == Some ( UserRecordType :: PERF_FINISHED_ROUND ) {
465475 self . sorter . finish_round ( ) ;
466476 if self . sorter . has_more ( ) {
467477 // The sorter is non-empty. We're done.
@@ -476,7 +486,6 @@ impl<R: Read> PerfRecordIter<R> {
476486 let event_body_len = size - PerfEventHeader :: STRUCT_SIZE ;
477487 let mut buffer = self . buffers_for_recycling . pop_front ( ) . unwrap_or_default ( ) ;
478488 buffer. resize ( event_body_len, 0 ) ;
479-
480489 // Try to read the event body. For pipe mode, EOF here also means end of stream.
481490 match self . reader . read_exact ( & mut buffer) {
482491 Ok ( ( ) ) => { }
@@ -491,6 +500,34 @@ impl<R: Read> PerfRecordIter<R> {
491500 }
492501 }
493502
503+ if user_record_type == Some ( UserRecordType :: PERF_COMPRESSED ) {
504+ #[ cfg( not( feature = "zstd" ) ) ]
505+ {
506+ return Err ( Error :: IoError ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Unsupported ,
507+ "Compression support is not enabled. Please rebuild with the 'zstd' feature flag." ,
508+ ) ) ) ;
509+ }
510+ #[ cfg( feature = "zstd" ) ]
511+ {
512+ self . decompress_and_process_compressed :: < T > ( & buffer) ?;
513+ continue ;
514+ }
515+ }
516+
517+ if user_record_type == Some ( UserRecordType :: PERF_COMPRESSED2 ) {
518+ #[ cfg( not( feature = "zstd" ) ) ]
519+ {
520+ return Err ( Error :: IoError ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Unsupported ,
521+ "Compression support is not enabled. Please rebuild with the 'zstd' feature flag." ,
522+ ) ) ) ;
523+ }
524+ #[ cfg( feature = "zstd" ) ]
525+ {
526+ self . decompress_and_process_compressed2 :: < T > ( & buffer) ?;
527+ continue ;
528+ }
529+ }
530+
494531 self . process_record :: < T > ( header, buffer, offset) ?;
495532 }
496533
@@ -542,7 +579,95 @@ impl<R: Read> PerfRecordIter<R> {
542579 attr_index,
543580 } ;
544581 self . sorter . insert_unordered ( sort_key, pending_record) ;
582+ Ok ( ( ) )
583+ }
584+
585+ /// Decompresses a PERF_RECORD_COMPRESSED record and processes its sub-records.
586+ ///
587+ /// PERF_RECORD_COMPRESSED (type 81) was introduced in Linux 5.2 (2019).
588+ /// Format: header (8 bytes) + compressed data (header.size - 8 bytes)
589+ /// The compressed data size is implicit from the header size.
590+ #[ cfg( feature = "zstd" ) ]
591+ fn decompress_and_process_compressed < T : ByteOrder > (
592+ & mut self ,
593+ buffer : & [ u8 ] ,
594+ ) -> Result < ( ) , Error > {
595+ // For COMPRESSED, the entire buffer is compressed data
596+ // (no data_size field - size is implicit from header.size)
597+ let compressed_data = buffer;
598+
599+ let decompressed = self . zstd_decompressor . decompress ( compressed_data) ?;
600+ self . process_decompressed_records :: < T > ( & decompressed)
601+ }
545602
603+ /// Decompresses a PERF_RECORD_COMPRESSED2 record and processes its sub-records.
604+ ///
605+ /// PERF_RECORD_COMPRESSED2 (type 83) was introduced in Linux 6.x (May 2025)
606+ /// to fix 8-byte alignment issues with the original format.
607+ /// Format: header (8 bytes) + data_size (8 bytes) + compressed data + padding
608+ /// The header.size includes padding for 8-byte alignment; data_size has the actual size.
609+ #[ cfg( feature = "zstd" ) ]
610+ fn decompress_and_process_compressed2 < T : ByteOrder > (
611+ & mut self ,
612+ buffer : & [ u8 ] ,
613+ ) -> Result < ( ) , Error > {
614+ if buffer. len ( ) < 8 {
615+ return Err ( ReadError :: PerfEventData . into ( ) ) ;
616+ }
617+ let data_size = T :: read_u64 ( & buffer[ 0 ..8 ] ) as usize ;
618+ if data_size > buffer. len ( ) - 8 {
619+ return Err ( ReadError :: PerfEventData . into ( ) ) ;
620+ }
621+ let compressed_data = & buffer[ 8 ..8 + data_size] ;
622+
623+ let decompressed = self . zstd_decompressor . decompress ( compressed_data) ?;
624+ self . process_decompressed_records :: < T > ( & decompressed)
625+ }
626+
627+ /// Processes decompressed data as a sequence of perf records.
628+ /// Shared by both COMPRESSED and COMPRESSED2 handlers.
629+ #[ cfg( feature = "zstd" ) ]
630+ fn process_decompressed_records < T : ByteOrder > (
631+ & mut self ,
632+ decompressed : & [ u8 ] ,
633+ ) -> Result < ( ) , Error > {
634+ let mut cursor = Cursor :: new ( decompressed) ;
635+ let mut offset = 0u64 ;
636+
637+ while ( cursor. position ( ) as usize ) < decompressed. len ( ) {
638+ let header_start = cursor. position ( ) as usize ;
639+ // Check if we have enough bytes for a header
640+ let remaining = decompressed. len ( ) - header_start;
641+ if remaining < PerfEventHeader :: STRUCT_SIZE {
642+ self . zstd_decompressor
643+ . save_partial_record ( & decompressed[ header_start..] ) ;
644+ break ;
645+ }
646+
647+ let sub_header = PerfEventHeader :: parse :: < _ , T > ( & mut cursor) ?;
648+ let sub_size = sub_header. size as usize ;
649+ if sub_size < PerfEventHeader :: STRUCT_SIZE {
650+ return Err ( Error :: InvalidPerfEventSize ) ;
651+ }
652+
653+ let sub_event_body_len = sub_size - PerfEventHeader :: STRUCT_SIZE ;
654+ // Check if we have enough bytes for the sub-record body
655+ let remaining_after_header = decompressed. len ( ) - cursor. position ( ) as usize ;
656+ if sub_event_body_len > remaining_after_header {
657+ self . zstd_decompressor
658+ . save_partial_record ( & decompressed[ header_start..] ) ;
659+ break ;
660+ }
661+
662+ let mut sub_buffer = self . buffers_for_recycling . pop_front ( ) . unwrap_or_default ( ) ;
663+ sub_buffer. resize ( sub_event_body_len, 0 ) ;
664+ cursor
665+ . read_exact ( & mut sub_buffer)
666+ . map_err ( |_| ReadError :: PerfEventData ) ?;
667+
668+ self . process_record :: < T > ( sub_header, sub_buffer, offset) ?;
669+ offset += sub_size as u64 ;
670+ }
546671 Ok ( ( ) )
547672 }
548673
0 commit comments