Skip to content

Commit f205004

Browse files
committed
feat: Add pcap replay command
And refactored remote exec code
1 parent 1f75015 commit f205004

44 files changed

Lines changed: 1896 additions & 517 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

agent/Cargo.lock

Lines changed: 74 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agent/Cargo.toml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@ packet_segmentation_reassembly = { path = "plugins/packet_segmentation_reassembl
9191
packet_sequence_block = { path = "plugins/packet_sequence_block" }
9292
page_size = "0.4.2"
9393
parking_lot = "0.11"
94+
pcap = "2.4"
9495
pcap_assembler = { path = "plugins/pcap_assembler" }
96+
pcap-parser = { version = "0.17", optional = true }
9597
pcap-sys = "0.1.3"
9698
pnet = "^0.29"
9799
prost.workspace = true
@@ -125,7 +127,6 @@ zstd = "0.13.2"
125127
[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
126128
cgroups-rs = "0.2.9"
127129
nix = "0.23"
128-
pcap = "0.9.1"
129130
# As of procfs 0.16.0, Process::fd().iter() still not giving correct results on kernel 2.6.32
130131
# ref: https://github.com/eminence/procfs/pull/241
131132
procfs = { git = "https://github.com/deepflowio/procfs/" }
@@ -145,7 +146,6 @@ schemars = "0.8"
145146
trace-utils = { path = "crates/trace-utils" }
146147

147148
[target.'cfg(target_os = "windows")'.dependencies]
148-
pcap = "0.10.1"
149149
winapi = { version = "0.3.9", features = [
150150
"errhandlingapi",
151151
"libloaderapi",
@@ -164,8 +164,10 @@ windows = { version = "0.30", features = [
164164

165165
[dev-dependencies]
166166
criterion = "0.3.5"
167+
env_logger = "0.11"
167168
lru = "0.9.0"
168169
tempfile = "3.2.0"
170+
tokio-stream = "0.1"
169171
uluru = "3.0.0"
170172

171173
[build-dependencies]
@@ -177,7 +179,7 @@ walkdir = "2"
177179

178180
[features]
179181
default = ["libtrace"]
180-
enterprise = ["dep:enterprise-utils"]
182+
enterprise = ["dep:enterprise-utils", "dep:pcap-parser"]
181183
extended_observability = ["libtrace"]
182184
dylib_pcap = []
183185
libtrace = []

agent/benches/flow_generator/l7_log.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ pub(super) fn bench(c: &mut Criterion) {
8686
let rrt_cache = Rc::new(RefCell::new(L7PerfCache::new(8)));
8787
let req_param = ParseParam::new(
8888
&packets[0],
89-
rrt_cache.clone(),
89+
Some(rrt_cache.clone()),
9090
Default::default(),
9191
#[cfg(any(target_os = "linux", target_os = "android"))]
9292
Default::default(),
@@ -95,7 +95,7 @@ pub(super) fn bench(c: &mut Criterion) {
9595
);
9696
let resp_param = ParseParam::new(
9797
&packets[1],
98-
rrt_cache.clone(),
98+
Some(rrt_cache.clone()),
9999
Default::default(),
100100
#[cfg(any(target_os = "linux", target_os = "android"))]
101101
Default::default(),

agent/crates/enterprise-utils/src/lib.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub mod l7 {
100100
_: public::l7_protocol::L7ProtocolEnum,
101101
_: u16,
102102
) -> Option<PolicySlice> {
103-
unimplemented!()
103+
None
104104
}
105105
pub fn counters(
106106
&self,
@@ -123,20 +123,17 @@ pub mod l7 {
123123
_: super::enums::TrafficDirection,
124124
_: enums::Source,
125125
) {
126-
unimplemented!()
127126
}
128127
}
129128

130129
#[derive(Default, Debug)]
131130
pub struct Store;
132131
impl Store {
133132
pub fn is_empty(&self) -> bool {
134-
unimplemented!()
133+
true
135134
}
136135

137-
pub fn clear(&mut self) {
138-
unimplemented!()
139-
}
136+
pub fn clear(&mut self) {}
140137

141138
pub fn into_iter_with<L: public::l7_protocol::L7Log>(
142139
self,

agent/crates/public/src/rpc.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ pub mod remote_exec {
2424
SerializeError(#[from] serde_json::Error),
2525
#[error("transparent")]
2626
SyscallFailed(String),
27+
#[error("batch length too small, need at least {0} bytes")]
28+
BatchLengthTooSmall(usize),
29+
#[error("pcap parse error failed: {0}")]
30+
PcapParseFailed(String),
2731
}
2832

2933
type Result<T> = std::result::Result<T, Error>;

agent/examples/parse_http_v1.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ fn main() {
5757
&packets[0].get_l4_payload().unwrap(),
5858
&ParseParam::new(
5959
&packets[0],
60-
log_cache.clone(),
60+
Some(log_cache.clone()),
6161
Default::default(),
6262
#[cfg(any(target_os = "linux", target_os = "android"))]
6363
Default::default(),
@@ -69,7 +69,7 @@ fn main() {
6969
&packets[1].get_l4_payload().unwrap(),
7070
&ParseParam::new(
7171
&packets[1],
72-
log_cache.clone(),
72+
Some(log_cache.clone()),
7373
Default::default(),
7474
#[cfg(any(target_os = "linux", target_os = "android"))]
7575
Default::default(),

agent/examples/remote_executor.rs

Lines changed: 0 additions & 66 deletions
This file was deleted.

agent/src/common/l7_protocol_info.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,10 @@ where
202202
is_reversed: bool,
203203
) -> Option<String> {
204204
let key = LogCacheKey::new(param, self.session_id(), is_reversed);
205-
match param.l7_perf_cache.borrow_mut().rrt_cache.get(&key) {
205+
let Some(perf_cache) = param.l7_perf_cache.as_ref() else {
206+
return None;
207+
};
208+
match perf_cache.borrow_mut().rrt_cache.get(&key) {
206209
Some(cached) if cached.endpoint.is_some() => {
207210
let log = LogCache {
208211
time: param.time,
@@ -234,6 +237,9 @@ where
234237
error!("flow_id: {}, packet time 0", param.flow_id);
235238
return None;
236239
}
240+
let Some(perf_cache) = param.l7_perf_cache.as_ref() else {
241+
return None;
242+
};
237243

238244
let cur_info = LogCache {
239245
time: param.time,
@@ -261,12 +267,10 @@ where
261267
return Some(stats);
262268
}
263269

264-
let (mut rtt_cache, mut timeout_cache) = RefMut::map_split(
265-
param.l7_perf_cache.borrow_mut(),
266-
|perf_cache: &mut L7PerfCache| {
270+
let (mut rtt_cache, mut timeout_cache) =
271+
RefMut::map_split(perf_cache.borrow_mut(), |perf_cache: &mut L7PerfCache| {
267272
(&mut perf_cache.rrt_cache, &mut perf_cache.timeout_cache)
268-
},
269-
);
273+
});
270274
let key = LogCacheKey::new(param, self.session_id(), self.is_reversed());
271275
let prev_info = rtt_cache.get_mut(&key);
272276
let timeout_counter = timeout_cache.get_or_insert_mut(param.flow_id);

0 commit comments

Comments
 (0)