Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
4a3e553
Initial implementation
neilconway Feb 20, 2026
6b4f5c0
cargo fmt
neilconway Mar 29, 2026
1412ab1
Properly wait for subquery exec to complete before exec'ing main input
neilconway Mar 29, 2026
cedfa5c
Better fix for async exec issue
neilconway Mar 29, 2026
d80569f
Fix doc lint error
neilconway Mar 29, 2026
9f606fb
Implement logical plan serialization/deserialization for subqueries
neilconway Mar 30, 2026
b07491b
cargo fmt
neilconway Mar 30, 2026
27a1ac2
Refactor logical plan deserialization
neilconway Mar 30, 2026
bce0a6d
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Mar 30, 2026
7071001
Increase large files size check
neilconway Mar 30, 2026
b9bce91
fix clippy
neilconway Mar 30, 2026
7c965aa
Update expected TPC-H plans
neilconway Mar 30, 2026
09f167a
Implement statistics
neilconway Mar 30, 2026
54a9f79
Tweak comments
neilconway Mar 30, 2026
b979e3d
Merge branch 'main' into neilc/scalar-subquery-expr
neilconway Mar 30, 2026
2c256e7
Ensure projection pushdown works inside uncorrelated subqueries
neilconway Mar 30, 2026
99d9bcf
Update expected plans
neilconway Mar 30, 2026
9a11d62
Fix overlooked cases for projection pushdown
neilconway Mar 31, 2026
9b217ca
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Mar 31, 2026
5aef67e
Fix line numbers in expected EXPLAIN
neilconway Mar 31, 2026
3d0b99f
Evaluate subqueries in parallel
neilconway Mar 31, 2026
f99ded5
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 2, 2026
b02abf8
Don't try to use subquery filters for partition pruning
neilconway Apr 2, 2026
3971312
Raise an error if duplicate subquery eval is detected
neilconway Apr 2, 2026
64e9f34
cargo fmt
neilconway Apr 2, 2026
26d8acb
Update expected plan
neilconway Apr 2, 2026
d2af491
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 2, 2026
f9c9d5d
Remove unnecessary IN/EXISTS serialization code
neilconway Apr 3, 2026
92e6054
Code cleanup
neilconway Apr 3, 2026
6857966
Code cleanup
neilconway Apr 3, 2026
6a4f524
Code cleanup and refactoring
neilconway Apr 3, 2026
7adb788
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 3, 2026
670139c
Updates for plan API changes
neilconway Apr 3, 2026
1239e3a
Fix doc build
neilconway Apr 3, 2026
7bb6959
Add sanity check on subquery schema, per review
neilconway Apr 12, 2026
4c824a4
Improve comments per review
neilconway Apr 13, 2026
ee58247
Introduce struct wrapping scalar subquery results, per review
neilconway Apr 13, 2026
f582628
Fix reset_state bug
neilconway Apr 13, 2026
4e7442c
Simplify new test case
neilconway Apr 13, 2026
a2087d0
Remove benchmarks, not useful
neilconway Apr 14, 2026
dc4ca31
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 14, 2026
0416b0d
Refactor unit test code
neilconway Apr 14, 2026
b9d307b
Fix doc build
neilconway Apr 14, 2026
e787873
Doc fix
neilconway Apr 14, 2026
ce1e3c0
Tweak comments
neilconway Apr 14, 2026
c32edba
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 14, 2026
50d0ef8
Revert clippy benchmark fix
neilconway Apr 14, 2026
5935028
Update stale comment
neilconway Apr 14, 2026
4b60787
Use IndexSet instead of HashSet + Vec
neilconway Apr 14, 2026
177a190
Reduce unnecessary cloning of subqueries
neilconway Apr 14, 2026
cf582e3
Introduce type wrapper for subquery index
neilconway Apr 14, 2026
2245153
Minor cleanups
neilconway Apr 14, 2026
19f796c
Fix clippy
neilconway Apr 14, 2026
12b4b6b
Tweak metadata SLT query
neilconway Apr 14, 2026
59a47c8
New approach to proto serialization
neilconway Apr 15, 2026
9ed5cf7
Refactor new proto approach
neilconway Apr 15, 2026
010bc43
Update stale comments
neilconway Apr 15, 2026
7c7265b
Various cleanups
neilconway Apr 15, 2026
200f081
Further cleanups
neilconway Apr 15, 2026
58481f4
Add migration guide notes
neilconway Apr 16, 2026
47ae89c
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 16, 2026
6d8d056
Fix CI failures
neilconway Apr 16, 2026
0f3bb7b
Unbreak CI, hopefully
neilconway Apr 16, 2026
a7ea4c8
Fix prettier issues
neilconway Apr 16, 2026
f6ab615
Fix another prettier issue
neilconway Apr 16, 2026
975ba7c
Fix README
neilconway Apr 16, 2026
443e05f
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 17, 2026
5cc14ea
Sigh, fix prettier again
neilconway Apr 17, 2026
bd19609
Merge remote-tracking branch 'origin/main' into neilc/scalar-subquery…
neilconway Apr 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/large_files.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ jobs:
fetch-depth: 0
- name: Check size of new Git objects
env:
# 1 MB ought to be enough for anybody.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to up the limit? this repo gets checked out a lot

What is so large that required increasing to 2MB?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this because pbjson.rs started to exceed the limit (this PR only increases its size slightly, but it is only a hair under 1MB in mainline).

We could certainly make the limit tighter (e.g., 1.2MB) -- or if there's a different approach you prefer, lmk.

# 2 MB ought to be enough for anybody.
# TODO in case we may want to consciously commit a bigger file to the repo without using Git LFS we may disable the check e.g. with a label
MAX_FILE_SIZE_BYTES: 1048576
MAX_FILE_SIZE_BYTES: 2097152
shell: bash
run: |
if [ "${{ github.event_name }}" = "merge_group" ]; then
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

//! See `main.rs` for how to run it.
//!
//! This example demonstrates how to use the `PhysicalExtensionCodec` trait's
//! interception methods (`serialize_physical_plan` and `deserialize_physical_plan`)
//! to implement custom serialization logic.
//! This example demonstrates how to use the `PhysicalProtoConverterExtension`
//! trait's interception methods (`execution_plan_to_proto` and
//! `proto_to_execution_plan`) to implement custom serialization logic.
//!
//! The key insight is that `FileScanConfig::expr_adapter_factory` is NOT serialized by
//! default. This example shows how to:
Expand All @@ -28,9 +28,10 @@
//! 3. Store the inner DataSourceExec (without adapter) as a child in the extension's inputs field
//! 4. Unwrap and restore the adapter during deserialization
//!
//! This demonstrates nested serialization (protobuf outer, JSON inner) and the power
//! of the `PhysicalExtensionCodec` interception pattern. Both plan and expression
//! serialization route through the codec, enabling interception at every node in the tree.
//! This demonstrates nested serialization (protobuf outer, JSON inner) and the
//! power of `PhysicalProtoConverterExtension`. Both plan and expression
//! serialization route through converter hooks, enabling interception at every
//! node in the tree.

use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -61,7 +62,7 @@ use datafusion_proto::bytes::{
use datafusion_proto::physical_plan::from_proto::parse_physical_expr_with_converter;
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr_with_converter;
use datafusion_proto::physical_plan::{
PhysicalExtensionCodec, PhysicalProtoConverterExtension,
PhysicalExtensionCodec, PhysicalPlanDecodeContext, PhysicalProtoConverterExtension,
};
use datafusion_proto::protobuf::physical_plan_node::PhysicalPlanType;
use datafusion_proto::protobuf::{
Expand Down Expand Up @@ -177,7 +178,7 @@ pub async fn adapter_serialization() -> Result<()> {
println!("\n=== Example Complete! ===");
println!("Key takeaways:");
println!(
" 1. PhysicalExtensionCodec provides serialize_physical_plan/deserialize_physical_plan hooks"
" 1. PhysicalProtoConverterExtension provides execution_plan_to_proto/proto_to_execution_plan hooks"
);
println!(" 2. Custom metadata can be wrapped as PhysicalExtensionNode");
println!(" 3. Nested serialization (protobuf + JSON) works seamlessly");
Expand Down Expand Up @@ -303,9 +304,10 @@ impl PhysicalExtensionCodec for AdapterPreservingCodec {
_node: Arc<dyn ExecutionPlan>,
_buf: &mut Vec<u8>,
) -> Result<()> {
// We don't need this for the example - we use serialize_physical_plan instead
// We don't need this for the example - adapter wrapping happens in
// `execution_plan_to_proto` instead.
not_impl_err!(
"try_encode not used - adapter wrapping happens in serialize_physical_plan"
"try_encode not used - adapter wrapping happens in execution_plan_to_proto"
)
}
}
Expand Down Expand Up @@ -371,9 +373,8 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
// Interception point: override deserialization to unwrap adapters
fn proto_to_execution_plan(
&self,
ctx: &TaskContext,
extension_codec: &dyn PhysicalExtensionCodec,
proto: &PhysicalPlanNode,
ctx: &PhysicalPlanDecodeContext<'_>,
) -> Result<Arc<dyn ExecutionPlan>> {
// Check if this is our custom extension wrapper
if let Some(PhysicalPlanType::Extension(extension)) = &proto.physical_plan_type
Expand All @@ -395,11 +396,7 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
let inner_proto = &extension.inputs[0];

// Deserialize the inner plan
let inner_plan = inner_proto.try_into_physical_plan_with_converter(
ctx,
extension_codec,
self,
)?;
let inner_plan = self.default_proto_to_execution_plan(inner_proto, ctx)?;

// Recreate the adapter factory
let adapter_factory = create_adapter_factory(&payload.adapter_metadata.tag);
Expand All @@ -409,17 +406,16 @@ impl PhysicalProtoConverterExtension for AdapterPreservingCodec {
}

// Not our extension - use default deserialization
proto.try_into_physical_plan_with_converter(ctx, extension_codec, self)
self.default_proto_to_execution_plan(proto, ctx)
}

fn proto_to_physical_expr(
&self,
proto: &PhysicalExprNode,
ctx: &TaskContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
ctx: &PhysicalPlanDecodeContext<'_>,
) -> Result<Arc<dyn PhysicalExpr>> {
parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)
parse_physical_expr_with_converter(proto, input_schema, ctx, self)
}

fn physical_expr_to_proto(
Expand Down
23 changes: 11 additions & 12 deletions datafusion-examples/examples/proto/expression_deduplication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

//! See `main.rs` for how to run it.
//!
//! This example demonstrates how to use the `PhysicalExtensionCodec` trait's
//! interception methods to implement expression deduplication during deserialization.
//! This example demonstrates how to use the
//! `PhysicalProtoConverterExtension` trait's interception methods to
//! implement expression deduplication during deserialization.
//!
//! This pattern is inspired by PR #18192, which introduces expression caching
//! to reduce memory usage when deserializing plans with duplicate expressions.
Expand All @@ -29,8 +30,9 @@
//! 2. Reduce memory allocation during deserialization
//! 3. Enable downstream optimizations that rely on Arc pointer equality
//!
//! This demonstrates the decorator pattern enabled by the `PhysicalExtensionCodec` trait,
//! where all expression serialization/deserialization routes through the codec methods.
//! This demonstrates the decorator pattern enabled by
//! `PhysicalProtoConverterExtension`, where physical-expression
//! serialization and deserialization route through converter hooks.

use std::collections::HashMap;
use std::fmt::Debug;
Expand All @@ -49,7 +51,7 @@ use datafusion::prelude::SessionContext;
use datafusion_proto::physical_plan::from_proto::parse_physical_expr_with_converter;
use datafusion_proto::physical_plan::to_proto::serialize_physical_expr_with_converter;
use datafusion_proto::physical_plan::{
DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
DefaultPhysicalExtensionCodec, PhysicalExtensionCodec, PhysicalPlanDecodeContext,
PhysicalProtoConverterExtension,
};
use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode};
Expand Down Expand Up @@ -202,11 +204,10 @@ impl PhysicalExtensionCodec for CachingCodec {
impl PhysicalProtoConverterExtension for CachingCodec {
fn proto_to_execution_plan(
&self,
ctx: &TaskContext,
extension_codec: &dyn PhysicalExtensionCodec,
proto: &PhysicalPlanNode,
ctx: &PhysicalPlanDecodeContext<'_>,
) -> Result<Arc<dyn ExecutionPlan>> {
proto.try_into_physical_plan_with_converter(ctx, extension_codec, self)
self.default_proto_to_execution_plan(proto, ctx)
}

fn execution_plan_to_proto(
Expand All @@ -225,9 +226,8 @@ impl PhysicalProtoConverterExtension for CachingCodec {
fn proto_to_physical_expr(
&self,
proto: &PhysicalExprNode,
ctx: &TaskContext,
input_schema: &Schema,
codec: &dyn PhysicalExtensionCodec,
ctx: &PhysicalPlanDecodeContext<'_>,
) -> Result<Arc<dyn PhysicalExpr>> {
// Create cache key from protobuf bytes
let mut key = Vec::new();
Expand All @@ -249,8 +249,7 @@ impl PhysicalProtoConverterExtension for CachingCodec {
}

// Cache miss - deserialize and store
let expr =
parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?;
let expr = parse_physical_expr_with_converter(proto, input_schema, ctx, self)?;

// Store in cache
{
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ datafusion-session = { workspace = true }
datafusion-sql = { workspace = true, optional = true }
flate2 = { workspace = true, optional = true }
futures = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
liblzma = { workspace = true, optional = true }
log = { workspace = true }
Expand Down
Loading
Loading