[flink][spark] Fix partition pruning for non-string partition keys#3322
[flink][spark] Fix partition pruning for non-string partition keys#3322fresh-borzoni wants to merge 3 commits into
Conversation
Partition predicate pushdown stringified literals and partition values before evaluation, so range comparisons fell back to string lexicographic order. An INT partition column with values 2 and 10 under WHERE pt > 2 lex-compared "10" < "2" and incorrectly dropped partition 10. Add PartitionUtils.toPartitionRow and PartitionUtils.partitionRowType in fluss-common. Use them from SparkPartitionPredicate and FlinkSourceEnumerator; drop the stringify step in FlinkTableSource and delete StringifyPredicateVisitor. The stringifier was also hiding two latent gaps in LeafPredicate.get: BYTES had no case (UnsupportedOperationException) and TIMESTAMP_WITH_LOCAL_TIME_ZONE used getTimestampNtz instead of getTimestampLtz (ClassCastException). Both exercised by testStreamingReadAllPartitionTypePushDown; fix in the same file. Regression test for the partition pruning bug added with an INT partition column and a range predicate in SparkLogTableReadTest and FlinkTableSourceITCase. Closes apache#3292.
5b6aee7 to
e83a839
Compare
|
@Yohahaha @YannByron @luoyuxia PTAL 🙏 |
Yohahaha
left a comment
There was a problem hiding this comment.
parts of Spark LGTM! thank you for the fix!
luoyuxia
left a comment
There was a problem hiding this comment.
@fresh-borzoni Thanks for the pr. Only one comment. PTAL
There was a problem hiding this comment.
Pull request overview
Fixes partition pruning when a partition key is non-STRING (e.g., INT). Previously, both Flink and Spark connectors stringified literals and partition values before evaluating predicates, so range comparisons (e.g., pt > 2) used lexicographic order and could drop partitions like pt=10. The fix builds typed partition rows via new shared helpers and removes the stringification path. A latent LeafPredicate.get bug for TIMESTAMP_WITH_LOCAL_TIME_ZONE (and missing BYTES case) is also corrected, since these are now actually exercised.
Changes:
- Add
PartitionUtils.partitionRowTypeandPartitionUtils.toPartitionRowto build typed partition rows; reuse them from Flink and Spark. - Drop literal stringification in Flink (
StringifyPredicateVisitordeleted) and Spark (stringifyLiteralsremoved); pass typed predicates straight through. - Fix
LeafPredicate.getto usegetTimestampLtzfor LTZ and to handleBYTES; add Flink and Spark integration tests for INT-partition range pushdown.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java | Adds shared partitionRowType and toPartitionRow helpers used by both connectors. |
| fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java | Fixes LTZ extraction and adds BYTES case so typed partition rows evaluate correctly. |
| fluss-flink/.../source/FlinkTableSource.java | Stops stringifying pushed-down partition predicate literals. |
| fluss-flink/.../source/enumerator/FlinkSourceEnumerator.java | Builds typed partition row using new helpers when applying the partition filter. |
| fluss-flink/.../utils/StringifyPredicateVisitor.java | Removed; no longer needed after typed-row evaluation. |
| fluss-flink/.../source/FlinkTableSourceITCase.java | Adds INT partition range-predicate pushdown integration test. |
| fluss-spark/.../utils/SparkPartitionPredicate.scala | Drops local helpers/stringifier and delegates to PartitionUtils; threads tableInfo through filterPartitions. |
| fluss-spark/.../read/FlussBatch.scala | Passes tableInfo into the new filterPartitions signature. |
| fluss-spark/.../SparkLogTableReadTest.scala | Adds Spark INT partition range-predicate pushdown test. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
b379de7 to
b646912
Compare
b646912 to
abeeff4
Compare
|
@luoyuxia addressed comments, PTAL 🙏 |
Summary
Closes #3292
Partition predicate pushdown stringified literals and partition values before evaluation, so range comparisons fell back to string lexicographic order.
Example: An INT partition column with values 2 and 10 under WHERE pt > 2 lex-compared "10" < "2" and incorrectly dropped partition 10.
Added PartitionUtils.toPartitionRow and PartitionUtils.partitionRowType in fluss-common.