perf: Implement physical execution of uncorrelated scalar subqueries#21240
perf: Implement physical execution of uncorrelated scalar subqueries#21240neilconway wants to merge 17 commits intoapache:mainfrom
Conversation
| pub struct DefaultPhysicalProtoConverter; | ||
| #[derive(Default)] | ||
| pub struct DefaultPhysicalProtoConverter { | ||
| scalar_subquery_results: RefCell<Option<ScalarSubqueryResults>>, |
There was a problem hiding this comment.
I don't know the serialization/deserialization code well; would love feedback on whether this is the right way to do this.
| /// TODO: Consider overlapping computation of the subqueries with evaluating the | ||
| /// main query. | ||
| /// | ||
| /// TODO: Subqueries are evaluated sequentially. Consider parallel evaluation in | ||
| /// the future. |
There was a problem hiding this comment.
Happy to address these TODOs now or in a followup PR, if folks have opinions on the best way to do this.
| // Create the shared results container and register it (along with | ||
| // the index map) in ExecutionProps so that `create_physical_expr` | ||
| // can resolve `Expr::ScalarSubquery` into `ScalarSubqueryExpr` | ||
| // nodes. We clone the SessionState so these are available | ||
| // throughout physical planning without mutating the caller's state. | ||
| // | ||
| // Ideally, the subquery state would live in a dedicated planning | ||
| // context rather than on ExecutionProps (which is meant for | ||
| // session-level configuration). It's here because | ||
| // `create_physical_expr` only receives `&ExecutionProps`, and | ||
| // changing that signature would be a breaking public API change. | ||
| let results: Arc<Vec<OnceLock<ScalarValue>>> = | ||
| Arc::new((0..links.len()).map(|_| OnceLock::new()).collect()); | ||
| let session_state = if links.is_empty() { | ||
| Cow::Borrowed(session_state) | ||
| } else { | ||
| let mut owned = session_state.clone(); | ||
| owned.execution_props_mut().subquery_indexes = index_map; | ||
| owned.execution_props_mut().subquery_results = Arc::clone(&results); | ||
| Cow::Owned(owned) | ||
| }; |
There was a problem hiding this comment.
This seemed a bit kludgy but I couldn't think of a better way to do it; feedback/suggestions welcome.
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
run benchmark tpch10 |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (b9bce91) to 0be5982 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
Some slowdowns, due to missing statisitics I wonder? |
|
run benchmark tpch tpch10 tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (09f167a) to 0be5982 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (09f167a) to 0be5982 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (09f167a) to 0be5982 (merge-base) diff using: tpcds File an issue against this benchmark runner |
| } | ||
|
|
||
| fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> { | ||
| self.input.partition_statistics(partition) |
There was a problem hiding this comment.
I think this can be improved to return 1 row if the subquery has larger num_rows statistic (or no statistics at all)
There was a problem hiding this comment.
Same for bytes estimation
There was a problem hiding this comment.
Sorry, I want to make sure I understand. partition_statistics is about the statistics for the output of the plan node. The output of ScalarSubqueryExec is identical to the output of its main input child node; the n other children are just subqueries whose result values get used somewhere by the main input plan. The statistics of the child subqueries don't direct influence the properties of the output of the ScalarSubqueryExec itself. Lmk if I'm misunderstanding you though!
There was a problem hiding this comment.
My thoughy was if this is the physical node for scalar subquery (which returns 1 row maximum and returns error if it exceeeds 1 row)? If so, we could update the statistics (based on child stats) to be min(1, child).
Will try to look at the PR more in depth tomorrow!
There was a problem hiding this comment.
The idea is mostly that the subquery will have either unknown or high estimated stats, returning updated stats for a scalar subquery should generally be much more effective for planning correct join order, as it can be loaded in the build side.
There was a problem hiding this comment.
Thanks for the comments! I will admit I still don't entirely follow what you mean, sorry to be slow 🙃
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
From doing some digging on TPC-H query 11, I think the problem is that we're able to pushdown projections in the original plan but we weren't doing so for subqueries. That is fixed, will need to check out if there are any further performance regressions. |
|
run benchmark tpch tpch10 tpcds |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (2c256e7) to f830ee3 (merge-base) diff using: tpch10 File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (2c256e7) to f830ee3 (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing neilc/scalar-subquery-expr (2c256e7) to f830ee3 (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch10 — base (merge-base)
tpch10 — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Which issue does this PR close?
array_has#18181.Rationale for this change
Previously, DataFusion evaluated uncorrelated scalar subqueries by transforming them into joins. This has two shortcomings:
This PR introduces physical execution of uncorrelated scalar subqueries:
ScalarSubqueryExecplan node to the top of any physical plan with uncorrelated subqueries: it has N+1 children, N subqueries and its "main" input, which is the rest of the query plan. The subquery expression in the parent plan is replaced with aScalarSubqueryExpr.ScalarSubqueryExecmanages the execution of the subqueries and stores the result in a shared "results container", which is anArc<Vec<OnceLock<ScalarValue>>>. At present, subquery evaluation is done sequentially and not overlapped with evaluation of the parent query.ScalarSubqueryExpris evaluated, it fetches the result of the subquery from the result container.This architecture makes it easy to avoid the two shortcomings described above. Performance seems roughly unchanged (benchmarks added in this PR), but in situations like #18181, we can now leverage scalar fast-paths; in the case of #18181 specifically, this improves performance from ~800 ms to ~30 ms.
What changes are included in this PR?
ScalarSubqueryExprPhysicalProtoConverterExtensionto wire upScalarSubqueryExprcorrectlyAre these changes tested?
Yes.
Are there any user-facing changes?
At the SQL-level, scalar subqueries that returned > 1 row will now be rejected instead of producing incorrect query results.
At the API-level, this PR adds several new public APIs (e.g.,
ScalarSubqueryExpr,ScalarSubqueryExec) and makes breaking changes to several public APIs (e.g.,parse_expr). It also introduces a new physical plan node (and allowsSubqueryto remain in logical plans); third-party query optimization code will encounter these nodes when they wouldn't have before.