Skip to content

Commit 34478c7

Browse files
committed
feat: implement PartitionSpec-based non-identity transform detection for delete file safety
1 parent c0d3839 commit 34478c7

3 files changed

Lines changed: 89 additions & 24 deletions

File tree

spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,44 @@ object IcebergReflection extends Logging {
528528
* List of unsupported partition types (empty if all supported). Each entry is (fieldName,
529529
* typeStr, reason)
530530
*/
531+
/**
532+
* Checks whether a partition spec contains any non-identity transforms.
533+
*
534+
* Non-identity transforms include bucket, truncate, year, month, day, hour. These transforms
535+
* mean that Iceberg's partition pruning cannot fully resolve equality filters on the source
536+
* column, producing residual expressions that require post-scan filtering.
537+
*
538+
* @param partitionSpec
539+
* The Iceberg PartitionSpec object
540+
* @return
541+
* Some(transformStr) for the first non-identity transform found, or None if all are identity
542+
*/
543+
def findNonIdentityTransform(partitionSpec: Any): Option[String] = {
544+
import scala.jdk.CollectionConverters._
545+
546+
try {
547+
val fieldsMethod = partitionSpec.getClass.getMethod("fields")
548+
val fields = fieldsMethod.invoke(partitionSpec).asInstanceOf[java.util.List[_]]
549+
550+
val partitionFieldClass = loadClass(ClassNames.PARTITION_FIELD)
551+
val transformMethod = partitionFieldClass.getMethod("transform")
552+
553+
fields.asScala.foreach { field =>
554+
val transform = transformMethod.invoke(field)
555+
val transformStr = transform.toString
556+
if (transformStr != Transforms.IDENTITY) {
557+
return Some(transformStr)
558+
}
559+
}
560+
None
561+
} catch {
562+
case e: Exception =>
563+
logError(
564+
s"Iceberg reflection failure: Failed to inspect partition transforms: ${e.getMessage}")
565+
None
566+
}
567+
}
568+
531569
def validatePartitionTypes(partitionSpec: Any, schema: Any): List[(String, String, String)] = {
532570
import scala.jdk.CollectionConverters._
533571

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -531,12 +531,33 @@ case class CometScanRule(session: SparkSession)
531531
false
532532
}
533533

534-
// TODO: A safety guard for non-identity transforms (truncate, bucket, year, etc.)
535-
// with delete files should be re-implemented using PartitionSpec inspection.
536-
// The previous residual-based detection was ineffective because Iceberg residuals
537-
// use NamedReference terms without transform metadata. Correctness is currently
538-
// maintained by Iceberg/Spark's MOR delete processing and CometFilter post-scan
539-
// filtering. See: https://github.com/apache/datafusion-comet/issues/XXXX
534+
// Safety guard: non-identity transforms with delete files must fall back.
535+
// Non-identity transforms (truncate, bucket, year, etc.) produce residual
536+
// expressions that require post-scan filtering. When delete files are also
537+
// present, the native scan cannot correctly apply both the residual filter
538+
// and delete file processing together, so we fall back to Spark.
539+
// Detection uses PartitionSpec (table-level) rather than residual expressions,
540+
// since Iceberg residuals use NamedReference terms without transform metadata.
541+
val transformFunctionsSupported =
542+
IcebergReflection.getPartitionSpec(metadata.table) match {
543+
case Some(partitionSpec) =>
544+
IcebergReflection.findNonIdentityTransform(partitionSpec) match {
545+
case Some(transformType) =>
546+
if (!taskValidation.deleteFiles.isEmpty) {
547+
fallbackReasons +=
548+
s"Iceberg transform '$transformType' with delete files present. " +
549+
"Falling back to ensure correct delete operation."
550+
false
551+
} else {
552+
logInfo(
553+
s"Iceberg partition uses transform '$transformType' - " +
554+
"post-scan filtering will apply via CometFilter.")
555+
true
556+
}
557+
case None => true
558+
}
559+
case None => true // Cannot inspect spec, allow through
560+
}
540561

541562
// Check for unsupported struct types in delete files
542563
val deleteFileTypesSupported = {
@@ -619,7 +640,7 @@ case class CometScanRule(session: SparkSession)
619640

620641
if (schemaSupported && fileIOCompatible && formatVersionSupported &&
621642
taskValidation.allParquet && allSupportedFilesystems && partitionTypesSupported &&
622-
complexTypePredicatesSupported &&
643+
complexTypePredicatesSupported && transformFunctionsSupported &&
623644
deleteFileTypesSupported && dppSubqueriesSupported) {
624645
CometBatchScanExec(
625646
scanExec.clone().asInstanceOf[BatchScanExec],

spark/src/test/scala/org/apache/comet/CometIcebergNativeSuite.scala

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2860,12 +2860,13 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper {
28602860
// Additional integration tests for non-identity transform residuals
28612861
// =========================================================================
28622862

2863-
// Test A: Non-identity transform with delete files - verify correctness.
2864-
// When delete files and non-identity transforms coexist, query results must
2865-
// be correct regardless of whether the native scan or Spark handles it.
2863+
// Test A: Non-identity transform with delete files must fall back to Spark.
2864+
// When the table has a non-identity partition transform (truncate) AND MOR delete
2865+
// files are present, the native Iceberg scan must fall back to Spark to ensure
2866+
// correct delete processing. Detection uses PartitionSpec inspection.
28662867
// Uses truncate(3, name) so the query and deleted row share the same partition
28672868
// (truncate(3, 'alpha') = truncate(3, 'alpine') = 'alp').
2868-
test("non-identity transform residual - correct results with delete files present") {
2869+
test("non-identity transform residual - falls back with delete files present") {
28692870
assume(icebergAvailable, "Iceberg not available in classpath")
28702871

28712872
withTempIcebergDir { warehouseDir =>
@@ -2878,7 +2879,7 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper {
28782879
CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {
28792880

28802881
spark.sql("""
2881-
CREATE TABLE test_cat.db.truncate_delete_test (
2882+
CREATE TABLE test_cat.db.truncate_delete_fallback (
28822883
id INT,
28832884
name STRING,
28842885
value DOUBLE
@@ -2892,38 +2893,43 @@ class CometIcebergNativeSuite extends CometTestBase with RESTCatalogHelper {
28922893
""")
28932894

28942895
spark.sql("""
2895-
INSERT INTO test_cat.db.truncate_delete_test VALUES
2896+
INSERT INTO test_cat.db.truncate_delete_fallback VALUES
28962897
(1, 'alpha', 10.0), (2, 'alpine', 20.0), (3, 'bravo', 30.0),
28972898
(4, 'bridge', 40.0), (5, 'charlie', 50.0), (6, 'cherry', 60.0)
28982899
""")
28992900

29002901
// Delete 'alpine' which shares truncate(3)='alp' partition with 'alpha'.
2901-
spark.sql("DELETE FROM test_cat.db.truncate_delete_test WHERE name = 'alpine'")
2902+
spark.sql("DELETE FROM test_cat.db.truncate_delete_fallback WHERE name = 'alpine'")
29022903

2903-
// Query for 'alpha' creates a residual on the truncate transform.
2904-
// The deleted row 'alpine' must not appear, and 'alpha' must be returned.
2904+
// Query with filter. Because the table has truncate transform AND delete files,
2905+
// native scan must fall back to Spark.
29052906
val query =
2906-
"SELECT * FROM test_cat.db.truncate_delete_test WHERE name = 'alpha' ORDER BY id"
2907+
"SELECT * FROM test_cat.db.truncate_delete_fallback WHERE name = 'alpha' ORDER BY id"
29072908
val (_, cometPlan) = checkSparkAnswer(query)
29082909

2910+
// Assert fallback: no CometIcebergNativeScanExec in plan
2911+
val icebergScans = collectIcebergNativeScans(cometPlan)
2912+
assert(
2913+
icebergScans.isEmpty,
2914+
"Expected fallback to Spark (no CometIcebergNativeScanExec) when " +
2915+
s"non-identity transform has delete files. Plan:\n$cometPlan")
2916+
29092917
// Verify correct results: only 'alpha' returned, 'alpine' is deleted
29102918
val result = spark.sql(query).collect()
29112919
assert(result.length == 1, s"Expected 1 row, got ${result.length}")
29122920
assert(result(0).getInt(0) == 1, s"Expected id=1, got ${result(0).getInt(0)}")
29132921
assert(result(0).getString(1) == "alpha")
29142922

29152923
// Verify 'alpine' is truly gone from broader query
2916-
val allAlpResult = spark
2917-
.sql("SELECT * FROM test_cat.db.truncate_delete_test ORDER BY id")
2924+
val allResult = spark
2925+
.sql("SELECT * FROM test_cat.db.truncate_delete_fallback ORDER BY id")
29182926
.collect()
2927+
assert(allResult.length == 5, s"Expected 5 rows after delete, got ${allResult.length}")
29192928
assert(
2920-
allAlpResult.length == 5,
2921-
s"Expected 5 rows after delete, got ${allAlpResult.length}")
2922-
assert(
2923-
!allAlpResult.exists(_.getString(1) == "alpine"),
2929+
!allResult.exists(_.getString(1) == "alpine"),
29242930
"Deleted row 'alpine' should not appear in results")
29252931

2926-
spark.sql("DROP TABLE test_cat.db.truncate_delete_test")
2932+
spark.sql("DROP TABLE test_cat.db.truncate_delete_fallback")
29272933
}
29282934
}
29292935
}

0 commit comments

Comments
 (0)