Skip to content

Commit a5a9f44

Browse files
authored
perf: Enable native c2r for more queries (#3764)
* remove legacy restriction * format * update golden files
1 parent e23d946 commit a5a9f44

403 files changed

Lines changed: 828 additions & 851 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ package org.apache.comet.rules
2222
import org.apache.spark.sql.SparkSession
2323
import org.apache.spark.sql.catalyst.rules.Rule
2424
import org.apache.spark.sql.catalyst.util.sideBySide
25-
import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometScanExec, CometSparkToColumnarExec}
25+
import org.apache.spark.sql.comet.{CometCollectLimitExec, CometColumnarToRowExec, CometNativeColumnarToRowExec, CometNativeWriteExec, CometPlan, CometSparkToColumnarExec}
2626
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometShuffleExchangeExec}
2727
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, SparkPlan}
2828
import org.apache.spark.sql.execution.adaptive.QueryStageExec
@@ -139,35 +139,12 @@ case class EliminateRedundantTransitions(session: SparkSession) extends Rule[Spa
139139
private def createColumnarToRowExec(child: SparkPlan): SparkPlan = {
140140
val schema = child.schema
141141
val useNative = CometConf.COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED.get() &&
142-
CometNativeColumnarToRowExec.supportsSchema(schema) &&
143-
!hasScanUsingMutableBuffers(child)
142+
CometNativeColumnarToRowExec.supportsSchema(schema)
144143

145144
if (useNative) {
146145
CometNativeColumnarToRowExec(child)
147146
} else {
148147
CometColumnarToRowExec(child)
149148
}
150149
}
151-
152-
/**
153-
* Checks if the plan contains a scan that uses mutable buffers. Native C2R is not compatible
154-
* with such scans because the buffers may be modified after C2R reads them.
155-
*
156-
* This includes:
157-
* - CometScanExec with native_iceberg_compat and partition columns - uses
158-
* ConstantColumnReader
159-
*/
160-
private def hasScanUsingMutableBuffers(op: SparkPlan): Boolean = {
161-
op match {
162-
case c: QueryStageExec => hasScanUsingMutableBuffers(c.plan)
163-
case c: ReusedExchangeExec => hasScanUsingMutableBuffers(c.child)
164-
case _ =>
165-
op.exists {
166-
case scan: CometScanExec =>
167-
scan.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT &&
168-
scan.relation.partitionSchema.nonEmpty
169-
case _ => false
170-
}
171-
}
172-
}
173150
}

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q1.native_iceberg_compat/extended.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CometColumnarToRow
1+
CometNativeColumnarToRow
22
+- CometTakeOrderedAndProject
33
+- CometProject
44
+- CometBroadcastHashJoin

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10.native_iceberg_compat/extended.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
TakeOrderedAndProject
22
+- HashAggregate
3-
+- CometColumnarToRow
3+
+- CometNativeColumnarToRow
44
+- CometColumnarExchange
55
+- HashAggregate
66
+- Project
@@ -11,7 +11,7 @@ TakeOrderedAndProject
1111
: : +- Filter
1212
: : +- BroadcastHashJoin
1313
: : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)]
14-
: : : :- CometColumnarToRow
14+
: : : :- CometNativeColumnarToRow
1515
: : : : +- CometBroadcastHashJoin
1616
: : : : :- CometFilter
1717
: : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer
@@ -30,7 +30,7 @@ TakeOrderedAndProject
3030
: : : : +- CometFilter
3131
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
3232
: : : +- BroadcastExchange
33-
: : : +- CometColumnarToRow
33+
: : : +- CometNativeColumnarToRow
3434
: : : +- CometProject
3535
: : : +- CometBroadcastHashJoin
3636
: : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.web_sales
@@ -40,7 +40,7 @@ TakeOrderedAndProject
4040
: : : +- CometFilter
4141
: : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
4242
: : +- BroadcastExchange
43-
: : +- CometColumnarToRow
43+
: : +- CometNativeColumnarToRow
4444
: : +- CometProject
4545
: : +- CometBroadcastHashJoin
4646
: : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q11.native_iceberg_compat/extended.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CometColumnarToRow
1+
CometNativeColumnarToRow
22
+- CometTakeOrderedAndProject
33
+- CometProject
44
+- CometBroadcastHashJoin

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q12.native_iceberg_compat/extended.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
TakeOrderedAndProject
22
+- Project
33
+- Window [COMET: WindowExec is not fully compatible with Spark (Native WindowExec has known correctness issues). To enable it anyway, set spark.comet.operator.WindowExec.allowIncompatible=true. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html).]
4-
+- CometColumnarToRow
4+
+- CometNativeColumnarToRow
55
+- CometSort
66
+- CometExchange
77
+- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q13.native_iceberg_compat/extended.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CometColumnarToRow
1+
CometNativeColumnarToRow
22
+- CometHashAggregate
33
+- CometExchange
44
+- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q14a.native_iceberg_compat/extended.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CometColumnarToRow
1+
CometNativeColumnarToRow
22
+- CometTakeOrderedAndProject
33
+- CometHashAggregate
44
+- CometExchange
@@ -8,7 +8,7 @@ CometColumnarToRow
88
:- CometProject
99
: +- CometFilter
1010
: : +- Subquery
11-
: : +- CometColumnarToRow
11+
: : +- CometNativeColumnarToRow
1212
: : +- CometHashAggregate
1313
: : +- CometExchange
1414
: : +- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q14b.native_iceberg_compat/extended.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
CometColumnarToRow
1+
CometNativeColumnarToRow
22
+- CometTakeOrderedAndProject
33
+- CometBroadcastHashJoin
44
:- CometFilter
55
: : +- Subquery
6-
: : +- CometColumnarToRow
6+
: : +- CometNativeColumnarToRow
77
: : +- CometHashAggregate
88
: : +- CometExchange
99
: : +- CometHashAggregate

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q15.native_iceberg_compat/extended.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CometColumnarToRow
1+
CometNativeColumnarToRow
22
+- CometTakeOrderedAndProject
33
+- CometHashAggregate
44
+- CometExchange

spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q16.native_iceberg_compat/extended.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
CometColumnarToRow
1+
CometNativeColumnarToRow
22
+- CometHashAggregate
33
+- CometColumnarExchange
44
+- HashAggregate
55
+- HashAggregate [COMET: Unsupported aggregation mode PartialMerge]
6-
+- CometColumnarToRow
6+
+- CometNativeColumnarToRow
77
+- CometHashAggregate
88
+- CometProject
99
+- CometBroadcastHashJoin

0 commit comments

Comments
 (0)