Skip to content

[flink][spark] Fix partition pruning for non-string partition keys#3322

Open
fresh-borzoni wants to merge 3 commits into
apache:mainfrom
fresh-borzoni:feat/fix-partition-pruning-types
Open

[flink][spark] Fix partition pruning for non-string partition keys#3322
fresh-borzoni wants to merge 3 commits into
apache:mainfrom
fresh-borzoni:feat/fix-partition-pruning-types

Conversation

@fresh-borzoni
Copy link
Copy Markdown
Member

@fresh-borzoni fresh-borzoni commented May 15, 2026

Summary

Closes #3292

Partition predicate pushdown stringified literals and partition values before evaluation, so range comparisons fell back to string lexicographic order.
Example: An INT partition column with values 2 and 10 under WHERE pt > 2 lex-compared "10" < "2" and incorrectly dropped partition 10.

Added PartitionUtils.toPartitionRow and PartitionUtils.partitionRowType in fluss-common.

Partition predicate pushdown stringified literals and partition values
before evaluation, so range comparisons fell back to string lexicographic
order. An INT partition column with values 2 and 10 under WHERE pt > 2
lex-compared "10" < "2" and incorrectly dropped partition 10.

Add PartitionUtils.toPartitionRow and PartitionUtils.partitionRowType in
fluss-common. Use them from SparkPartitionPredicate and
FlinkSourceEnumerator; drop the stringify step in FlinkTableSource and
delete StringifyPredicateVisitor.

The stringifier was also hiding two latent gaps in LeafPredicate.get:
BYTES had no case (UnsupportedOperationException) and
TIMESTAMP_WITH_LOCAL_TIME_ZONE used getTimestampNtz instead of
getTimestampLtz (ClassCastException). Both exercised by
testStreamingReadAllPartitionTypePushDown; fix in the same file.

Regression test for the partition pruning bug added with an INT partition
column and a range predicate in SparkLogTableReadTest and
FlinkTableSourceITCase.

Closes apache#3292.
@fresh-borzoni fresh-borzoni force-pushed the feat/fix-partition-pruning-types branch from 5b6aee7 to e83a839 Compare May 15, 2026 08:00
@fresh-borzoni
Copy link
Copy Markdown
Member Author

@Yohahaha @YannByron @luoyuxia PTAL 🙏

Copy link
Copy Markdown
Contributor

@Yohahaha Yohahaha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parts of Spark LGTM! thank you for the fix!

@fresh-borzoni
Copy link
Copy Markdown
Member Author

@Yohahaha addressed 👍
@luoyuxia do you mind to take a quick look, pls?

Copy link
Copy Markdown
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fresh-borzoni Thanks for the pr. Only one comment. PTAL

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes partition pruning when a partition key is non-STRING (e.g., INT). Previously, both Flink and Spark connectors stringified literals and partition values before evaluating predicates, so range comparisons (e.g., pt > 2) used lexicographic order and could drop partitions like pt=10. The fix builds typed partition rows via new shared helpers and removes the stringification path. A latent LeafPredicate.get bug for TIMESTAMP_WITH_LOCAL_TIME_ZONE (and missing BYTES case) is also corrected, since these are now actually exercised.

Changes:

  • Add PartitionUtils.partitionRowType and PartitionUtils.toPartitionRow to build typed partition rows; reuse them from Flink and Spark.
  • Drop literal stringification in Flink (StringifyPredicateVisitor deleted) and Spark (stringifyLiterals removed); pass typed predicates straight through.
  • Fix LeafPredicate.get to use getTimestampLtz for LTZ and to handle BYTES; add Flink and Spark integration tests for INT-partition range pushdown.

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
fluss-common/src/main/java/org/apache/fluss/utils/PartitionUtils.java Adds shared partitionRowType and toPartitionRow helpers used by both connectors.
fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java Fixes LTZ extraction and adds BYTES case so typed partition rows evaluate correctly.
fluss-flink/.../source/FlinkTableSource.java Stops stringifying pushed-down partition predicate literals.
fluss-flink/.../source/enumerator/FlinkSourceEnumerator.java Builds typed partition row using new helpers when applying the partition filter.
fluss-flink/.../utils/StringifyPredicateVisitor.java Removed; no longer needed after typed-row evaluation.
fluss-flink/.../source/FlinkTableSourceITCase.java Adds INT partition range-predicate pushdown integration test.
fluss-spark/.../utils/SparkPartitionPredicate.scala Drops local helpers/stringifier and delegates to PartitionUtils; threads tableInfo through filterPartitions.
fluss-spark/.../read/FlussBatch.scala Passes tableInfo into the new filterPartitions signature.
fluss-spark/.../SparkLogTableReadTest.scala Adds Spark INT partition range-predicate pushdown test.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@fresh-borzoni fresh-borzoni force-pushed the feat/fix-partition-pruning-types branch from b379de7 to b646912 Compare May 16, 2026 02:03
@fresh-borzoni fresh-borzoni force-pushed the feat/fix-partition-pruning-types branch from b646912 to abeeff4 Compare May 16, 2026 02:31
@fresh-borzoni
Copy link
Copy Markdown
Member Author

@luoyuxia addressed comments, PTAL 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[flink][spark] Partition pruning gives wrong results for non-STRING partition keys with range predicates

4 participants