Skip to content

Commit 618a440

Browse files
authored
refactor executePlan to try to avoid constantly entering tokio runtime. (#2938)
1 parent 7ff3cbe commit 618a440

1 file changed

Lines changed: 57 additions & 48 deletions

File tree

native/core/src/execution/jni_api.rs

Lines changed: 57 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -503,60 +503,69 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
503503
pull_input_batches(exec_context)?;
504504
}
505505

506-
loop {
507-
// Polling the stream.
508-
let next_item = exec_context.stream.as_mut().unwrap().next();
509-
let poll_output = get_runtime().block_on(async { poll!(next_item) });
510-
511-
// update metrics at interval
512-
if let Some(interval) = exec_context.metrics_update_interval {
513-
let now = Instant::now();
514-
if now - exec_context.metrics_last_update_time >= interval {
515-
update_metrics(&mut env, exec_context)?;
516-
exec_context.metrics_last_update_time = now;
506+
// Enter the runtime once for the entire polling loop to avoid repeated
507+
// Runtime::enter() overhead
508+
get_runtime().block_on(async {
509+
loop {
510+
// Polling the stream.
511+
let next_item = exec_context.stream.as_mut().unwrap().next();
512+
let poll_output = poll!(next_item);
513+
514+
// update metrics at interval
515+
if let Some(interval) = exec_context.metrics_update_interval {
516+
let now = Instant::now();
517+
if now - exec_context.metrics_last_update_time >= interval {
518+
update_metrics(&mut env, exec_context)?;
519+
exec_context.metrics_last_update_time = now;
520+
}
517521
}
518-
}
519522

520-
match poll_output {
521-
Poll::Ready(Some(output)) => {
522-
// prepare output for FFI transfer
523-
return prepare_output(
524-
&mut env,
525-
array_addrs,
526-
schema_addrs,
527-
output?,
528-
exec_context.debug_native,
529-
);
530-
}
531-
Poll::Ready(None) => {
532-
// Reaches EOF of output.
533-
if exec_context.explain_native {
534-
if let Some(plan) = &exec_context.root_op {
535-
let formatted_plan_str = DisplayableExecutionPlan::with_metrics(
536-
plan.native_plan.as_ref(),
537-
)
538-
.indent(true);
539-
info!(
540-
"Comet native query plan with metrics (Plan #{} Stage {} Partition {}):\
541-
\n plan creation took {:?}:\
542-
\n{formatted_plan_str:}",
543-
plan.plan_id, stage_id, partition, exec_context.plan_creation_time
544-
);
523+
match poll_output {
524+
Poll::Ready(Some(output)) => {
525+
// prepare output for FFI transfer
526+
return prepare_output(
527+
&mut env,
528+
array_addrs,
529+
schema_addrs,
530+
output?,
531+
exec_context.debug_native,
532+
);
533+
}
534+
Poll::Ready(None) => {
535+
// Reaches EOF of output.
536+
if exec_context.explain_native {
537+
if let Some(plan) = &exec_context.root_op {
538+
let formatted_plan_str = DisplayableExecutionPlan::with_metrics(
539+
plan.native_plan.as_ref(),
540+
)
541+
.indent(true);
542+
info!(
543+
"Comet native query plan with metrics (Plan #{} Stage {} Partition {}):\
544+
\n plan creation took {:?}:\
545+
\n{formatted_plan_str:}",
546+
plan.plan_id, stage_id, partition, exec_context.plan_creation_time
547+
);
548+
}
545549
}
550+
return Ok(-1);
551+
}
552+
// A poll pending means there are more than one blocking operators,
553+
// we don't need go back-forth between JVM/Native. Just keeping polling.
554+
Poll::Pending => {
555+
// TODO: Investigate if JNI calls are safe without block_in_place.
556+
// block_in_place prevents Tokio from migrating this task to another thread,
557+
// which is necessary because JNI env is thread-local. If we can guarantee
558+
// thread safety another way, we could remove this wrapper for better perf.
559+
tokio::task::block_in_place(|| {
560+
pull_input_batches(exec_context)
561+
})?;
562+
563+
// Output not ready yet
564+
continue;
546565
}
547-
return Ok(-1);
548-
}
549-
// A poll pending means there are more than one blocking operators,
550-
// we don't need go back-forth between JVM/Native. Just keeping polling.
551-
Poll::Pending => {
552-
// Pull input batches
553-
pull_input_batches(exec_context)?;
554-
555-
// Output not ready yet
556-
continue;
557566
}
558567
}
559-
}
568+
})
560569
})
561570
})
562571
}

0 commit comments

Comments
 (0)