Skip to content

Commit 797a76d

Browse files
authored
fix: skip Comet columnar shuffle for stages with DPP scans (#3879) (#3934)
1 parent 7a4f42d commit 797a76d

2 files changed

Lines changed: 59 additions & 2 deletions

File tree

spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometShuffleExchangeExec.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.rdd.RDD
3030
import org.apache.spark.serializer.Serializer
3131
import org.apache.spark.shuffle.sort.SortShuffleManager
3232
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
33-
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow}
33+
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, PlanExpression, UnsafeProjection, UnsafeRow}
3434
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
3535
import org.apache.spark.sql.catalyst.plans.logical.Statistics
3636
import org.apache.spark.sql.catalyst.plans.physical._
@@ -454,6 +454,11 @@ object CometShuffleExchangeExec
454454
return false
455455
}
456456

457+
if (CometConf.COMET_DPP_FALLBACK_ENABLED.get() && stageContainsDPPScan(s)) {
458+
withInfo(s, "Stage contains a scan with Dynamic Partition Pruning")
459+
return false
460+
}
461+
457462
if (!isCometJVMShuffleMode(s.conf)) {
458463
withInfo(s, "Comet columnar shuffle not enabled")
459464
return false
@@ -546,6 +551,22 @@ object CometShuffleExchangeExec
546551
}
547552
}
548553

554+
/**
555+
* Returns true if the stage (the subtree rooted at this shuffle) contains a scan with Dynamic
556+
* Partition Pruning (DPP). When DPP is present, the scan falls back to Spark, and wrapping the
557+
* stage with Comet shuffle creates inefficient row-to-columnar transitions.
558+
*/
559+
private def stageContainsDPPScan(s: ShuffleExchangeExec): Boolean = {
560+
def isDynamicPruningFilter(e: Expression): Boolean =
561+
e.exists(_.isInstanceOf[PlanExpression[_]])
562+
563+
s.child.exists {
564+
case scan: FileSourceScanExec =>
565+
scan.partitionFilters.exists(isDynamicPruningFilter)
566+
case _ => false
567+
}
568+
}
569+
549570
def isCometShuffleEnabledWithInfo(op: SparkPlan): Boolean = {
550571
if (!COMET_EXEC_SHUFFLE_ENABLED.get(op.conf)) {
551572
withInfo(

spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,44 @@ class CometExecSuite extends CometTestBase {
139139
val (_, cometPlan) = checkSparkAnswer(df)
140140
val infos = new ExtendedExplainInfo().generateExtendedInfo(cometPlan)
141141
assert(infos.contains("Dynamic Partition Pruning is not supported"))
142+
}
143+
}
144+
}
145+
}
146+
147+
test("DPP fallback avoids inefficient Comet shuffle (#3874)") {
148+
withTempDir { path =>
149+
val factPath = s"${path.getAbsolutePath}/fact.parquet"
150+
val dimPath = s"${path.getAbsolutePath}/dim.parquet"
151+
withSQLConf(CometConf.COMET_EXEC_ENABLED.key -> "false") {
152+
val one_day = 24 * 60 * 60000
153+
val fact = Range(0, 100)
154+
.map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString))
155+
.toDF("fact_id", "fact_date", "fact_str")
156+
fact.write.partitionBy("fact_date").parquet(factPath)
157+
val dim = Range(0, 10)
158+
.map(i => (i, new java.sql.Date(System.currentTimeMillis() + i * one_day), i.toString))
159+
.toDF("dim_id", "dim_date", "dim_str")
160+
dim.write.parquet(dimPath)
161+
}
162+
163+
// Force sort-merge join to get a shuffle exchange above the DPP scan
164+
Seq("parquet").foreach { v1List =>
165+
withSQLConf(
166+
SQLConf.USE_V1_SOURCE_LIST.key -> v1List,
167+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
168+
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "true") {
169+
spark.read.parquet(factPath).createOrReplaceTempView("dpp_fact2")
170+
spark.read.parquet(dimPath).createOrReplaceTempView("dpp_dim2")
171+
val df =
172+
spark.sql(
173+
"select * from dpp_fact2 join dpp_dim2 on fact_date = dim_date where dim_id > 7")
174+
val (_, cometPlan) = checkSparkAnswer(df)
142175

143-
assert(infos.contains("Comet accelerated"))
176+
// Verify no CometShuffleExchangeExec wraps the DPP stage
177+
assert(
178+
!cometPlan.toString().contains("CometColumnarShuffle"),
179+
"Should not use Comet columnar shuffle for stages with DPP scans")
144180
}
145181
}
146182
}

0 commit comments

Comments
 (0)