Skip to content

Commit 71f22f5

Browse files
authored
fix: enable more Spark SQL tests for native_datafusion (DynamicPartitionPruningSuite / ExplainSuite) (#3694)
1 parent f6d84b1 commit 71f22f5

2 files changed

Lines changed: 51 additions & 30 deletions

File tree

dev/diffs/3.5.8.diff

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -93,22 +93,23 @@ index 27ae10b3d59..78e69902dfd 100644
9393
+ }
9494
}
9595
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
96-
index db587dd9868..aac7295a53d 100644
96+
index db587dd9868..33802f29253 100644
9797
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
9898
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
9999
@@ -18,6 +18,7 @@
100100
package org.apache.spark.sql.execution
101101

102102
import org.apache.spark.annotation.DeveloperApi
103-
+import org.apache.spark.sql.comet.CometScanExec
103+
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
104104
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
105105
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
106106
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
107-
@@ -67,6 +68,7 @@ private[execution] object SparkPlanInfo {
107+
@@ -67,6 +68,8 @@ private[execution] object SparkPlanInfo {
108108
// dump the file scan metadata (e.g file path) to event log
109109
val metadata = plan match {
110110
case fileScan: FileSourceScanExec => fileScan.metadata
111111
+ case cometScan: CometScanExec => cometScan.metadata
112+
+ case nativeScan: CometNativeScanExec => nativeScan.metadata
112113
case _ => Map[String, String]()
113114
}
114115
new SparkPlanInfo(
@@ -396,14 +397,14 @@ index c4fb4fa943c..a04b23870a8 100644
396397
assert(exchanges.size == 2)
397398
}
398399
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
399-
index f33432ddb6f..42eb9fd1cb7 100644
400+
index f33432ddb6f..4acdf7e9cfb 100644
400401
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
401402
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
402403
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
403404
import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression}
404405
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
405406
import org.apache.spark.sql.catalyst.plans.ExistenceJoin
406-
+import org.apache.spark.sql.comet.CometScanExec
407+
+import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
407408
import org.apache.spark.sql.connector.catalog.{InMemoryTableCatalog, InMemoryTableWithV2FilterCatalog}
408409
import org.apache.spark.sql.execution._
409410
import org.apache.spark.sql.execution.adaptive._
@@ -447,40 +448,22 @@ index f33432ddb6f..42eb9fd1cb7 100644
447448
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
448449
val df = sql(
449450
""" WITH v as (
450-
@@ -1698,7 +1705,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
451-
* Check the static scan metrics with and without DPP
452-
*/
453-
test("static scan metrics",
454-
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
455-
+ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
456-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) {
457-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
458-
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
459-
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
460-
@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
451+
@@ -1729,6 +1736,10 @@ abstract class DynamicPartitionPruningV1Suite extends DynamicPartitionPruningDat
461452
case s: BatchScanExec =>
462453
// we use f1 col for v2 tables due to schema pruning
463454
s.output.exists(_.exists(_.argString(maxFields = 100).contains("f1")))
464455
+ case s: CometScanExec =>
456+
+ s.output.exists(_.exists(_.argString(maxFields = 100).contains("fid")))
457+
+ case s: CometNativeScanExec =>
465458
+ s.output.exists(_.exists(_.argString(maxFields = 100).contains("fid")))
466459
case _ => false
467460
}
468461
assert(scanOption.isDefined)
469462
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
470-
index a206e97c353..79813d8e259 100644
463+
index a206e97c353..fea1149b67d 100644
471464
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
472465
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
473-
@@ -280,7 +280,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
474-
}
475-
}
476-
477-
- test("explain formatted - check presence of subquery in case of DPP") {
478-
+ test("explain formatted - check presence of subquery in case of DPP",
479-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3313")) {
480-
withTable("df1", "df2") {
481-
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
482-
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
483-
@@ -467,7 +468,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
466+
@@ -467,7 +467,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
484467
}
485468
}
486469

@@ -490,7 +473,7 @@ index a206e97c353..79813d8e259 100644
490473
withTempDir { dir =>
491474
Seq("parquet", "orc", "csv", "json").foreach { fmt =>
492475
val basePath = dir.getCanonicalPath + "/" + fmt
493-
@@ -545,7 +547,9 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
476+
@@ -545,7 +546,9 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
494477
}
495478
}
496479

spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,33 @@ case class CometNativeScanExec(
7878
override val nodeName: String =
7979
s"CometNativeScan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"
8080

81+
override def verboseStringWithOperatorId(): String = {
82+
val metadataStr = metadata.toSeq.sorted
83+
.filterNot {
84+
case (_, value) if (value.isEmpty || value.equals("[]")) => true
85+
case (key, _) if (key.equals("DataFilters") || key.equals("Format")) => true
86+
case (_, _) => false
87+
}
88+
.map {
89+
case (key, _) if (key.equals("Location")) =>
90+
val location = relation.location
91+
val numPaths = location.rootPaths.length
92+
val abbreviatedLocation = if (numPaths <= 1) {
93+
location.rootPaths.mkString("[", ", ", "]")
94+
} else {
95+
"[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
96+
}
97+
s"$key: ${location.getClass.getSimpleName} ${redact(abbreviatedLocation)}"
98+
case (key, value) => s"$key: ${redact(value)}"
99+
}
100+
101+
s"""
102+
|$formattedNodeName
103+
|${ExplainUtils.generateFieldString("Output", output)}
104+
|${metadataStr.mkString("\n")}
105+
|""".stripMargin
106+
}
107+
81108
// exposed for testing
82109
lazy val bucketedScan: Boolean = originalPlan.bucketedScan && !disableBucketedScan
83110

@@ -202,13 +229,24 @@ case class CometNativeScanExec(
202229

203230
override def hashCode(): Int = Objects.hashCode(originalPlan, serializedPlanOpt)
204231

232+
private val driverMetricKeys =
233+
Set(
234+
"numFiles",
235+
"filesSize",
236+
"numPartitions",
237+
"metadataTime",
238+
"staticFilesNum",
239+
"staticFilesSize",
240+
"pruningTime")
241+
205242
override lazy val metrics: Map[String, SQLMetric] = {
206243
val nativeMetrics = CometMetricNode.nativeScanMetrics(session.sparkContext)
207244
// Map native metric names to Spark metric names
208-
nativeMetrics.get("output_rows") match {
245+
val withAlias = nativeMetrics.get("output_rows") match {
209246
case Some(metric) => nativeMetrics + ("numOutputRows" -> metric)
210247
case None => nativeMetrics
211248
}
249+
withAlias ++ scan.metrics.filterKeys(driverMetricKeys)
212250
}
213251

214252
/**

0 commit comments

Comments
 (0)