Skip to content

Commit 4f5eaf0

Browse files
authored
feat: enable native_datafusion scan in auto mode (#3781)
1 parent fb180b0 commit 4f5eaf0

8 files changed

Lines changed: 87 additions & 157 deletions

File tree

dev/diffs/3.4.3.diff

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2332,34 +2332,18 @@ index 240bb4e6dcb..8287ffa03ca 100644
23322332

23332333
import testImplicits._
23342334
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2335-
index 351c6d698fc..583d9225cca 100644
2335+
index 351c6d698fc..cef6bb08b8c 100644
23362336
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
23372337
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2338-
@@ -20,12 +20,14 @@ import java.io.File
2339-
2340-
import scala.collection.JavaConverters._
2341-
2342-
+import org.apache.comet.CometConf
2343-
import org.apache.hadoop.fs.Path
2344-
import org.apache.parquet.column.ParquetProperties._
2345-
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
2338+
@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
23462339
import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
23472340

23482341
import org.apache.spark.sql.QueryTest
2349-
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
2342+
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec}
23502343
import org.apache.spark.sql.execution.FileSourceScanExec
23512344
import org.apache.spark.sql.execution.datasources.FileFormat
23522345
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2353-
@@ -172,6 +174,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2354-
2355-
private def testRowIndexGeneration(label: String, conf: RowIndexTestConf): Unit = {
2356-
test (s"$label - ${conf.desc}") {
2357-
+ // native_datafusion Parquet scan does not support row index generation.
2358-
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2359-
withSQLConf(conf.sqlConfs: _*) {
2360-
withTempPath { path =>
2361-
val rowIndexColName = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
2362-
@@ -230,6 +234,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2346+
@@ -230,6 +231,17 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
23632347
case f: FileSourceScanExec =>
23642348
numPartitions += f.inputRDD.partitions.length
23652349
numOutputRows += f.metrics("numOutputRows").value
@@ -2369,18 +2353,14 @@ index 351c6d698fc..583d9225cca 100644
23692353
+ case b: CometBatchScanExec =>
23702354
+ numPartitions += b.inputRDD.partitions.length
23712355
+ numOutputRows += b.metrics("numOutputRows").value
2356+
+ case b: CometNativeScanExec =>
2357+
+ numPartitions +=
2358+
+ b.originalPlan.inputRDD.partitions.length
2359+
+ numOutputRows +=
2360+
+ b.metrics("numOutputRows").value
23722361
case _ =>
23732362
}
23742363
assert(numPartitions > 0)
2375-
@@ -291,6 +301,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2376-
val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
2377-
2378-
test(s"invalid row index column type - ${conf.desc}") {
2379-
+ // native_datafusion Parquet scan does not support row index generation.
2380-
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2381-
withSQLConf(conf.sqlConfs: _*) {
2382-
withTempPath{ path =>
2383-
val df = spark.range(0, 10, 1, 1).toDF("id")
23842364
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
23852365
index 5c0b7def039..151184bc98c 100644
23862366
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala

dev/diffs/3.5.8.diff

Lines changed: 29 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ index a206e97c353..fea1149b67d 100644
494494

495495
test("SPARK-35884: Explain Formatted") {
496496
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
497-
index 93275487f29..77a27d1c40a 100644
497+
index 93275487f29..78150c9163e 100644
498498
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
499499
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
500500
@@ -23,6 +23,7 @@ import java.nio.file.{Files, StandardOpenOption}
@@ -513,16 +513,20 @@ index 93275487f29..77a27d1c40a 100644
513513
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
514514
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
515515
import org.apache.spark.sql.execution.datasources.FilePartition
516-
@@ -250,6 +252,8 @@ class FileBasedDataSourceSuite extends QueryTest
516+
@@ -250,6 +252,12 @@ class FileBasedDataSourceSuite extends QueryTest
517517
case "" => "_LEGACY_ERROR_TEMP_2062"
518518
case _ => "_LEGACY_ERROR_TEMP_2055"
519519
}
520-
+ // native_datafusion Parquet scan cannot throw a SparkFileNotFoundException
521-
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
520+
+ // native_datafusion Parquet scan cannot throw
521+
+ // a SparkFileNotFoundException
522+
+ assume(!Seq(
523+
+ CometConf.SCAN_NATIVE_DATAFUSION,
524+
+ CometConf.SCAN_AUTO
525+
+ ).contains(CometConf.COMET_NATIVE_SCAN_IMPL.get()))
522526
checkErrorMatchPVals(
523527
exception = intercept[SparkException] {
524528
testIgnoreMissingFiles(options)
525-
@@ -656,18 +660,25 @@ class FileBasedDataSourceSuite extends QueryTest
529+
@@ -656,18 +664,25 @@ class FileBasedDataSourceSuite extends QueryTest
526530
checkAnswer(sql(s"select A from $tableName"), data.select("A"))
527531

528532
// RuntimeException is triggered at executor side, which is then wrapped as
@@ -555,31 +559,31 @@ index 93275487f29..77a27d1c40a 100644
555559
errorClass = "_LEGACY_ERROR_TEMP_2093",
556560
parameters = Map("requiredFieldName" -> "b", "matchedOrcFields" -> "[b, B]")
557561
)
558-
@@ -955,6 +966,7 @@ class FileBasedDataSourceSuite extends QueryTest
562+
@@ -955,6 +970,7 @@ class FileBasedDataSourceSuite extends QueryTest
559563
assert(bJoinExec.isEmpty)
560564
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
561565
case smJoin: SortMergeJoinExec => smJoin
562566
+ case smJoin: CometSortMergeJoinExec => smJoin
563567
}
564568
assert(smJoinExec.nonEmpty)
565569
}
566-
@@ -1015,6 +1027,7 @@ class FileBasedDataSourceSuite extends QueryTest
570+
@@ -1015,6 +1031,7 @@ class FileBasedDataSourceSuite extends QueryTest
567571

568572
val fileScan = df.queryExecution.executedPlan collectFirst {
569573
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
570574
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
571575
}
572576
assert(fileScan.nonEmpty)
573577
assert(fileScan.get.partitionFilters.nonEmpty)
574-
@@ -1056,6 +1069,7 @@ class FileBasedDataSourceSuite extends QueryTest
578+
@@ -1056,6 +1073,7 @@ class FileBasedDataSourceSuite extends QueryTest
575579

576580
val fileScan = df.queryExecution.executedPlan collectFirst {
577581
case BatchScanExec(_, f: FileScan, _, _, _, _) => f
578582
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _), _, _) => f
579583
}
580584
assert(fileScan.nonEmpty)
581585
assert(fileScan.get.partitionFilters.isEmpty)
582-
@@ -1240,6 +1254,9 @@ class FileBasedDataSourceSuite extends QueryTest
586+
@@ -1240,6 +1258,9 @@ class FileBasedDataSourceSuite extends QueryTest
583587
val filters = df.queryExecution.executedPlan.collect {
584588
case f: FileSourceScanLike => f.dataFilters
585589
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -1982,7 +1986,7 @@ index 07e2849ce6f..3e73645b638 100644
19821986
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
19831987
)
19841988
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
1985-
index 8e88049f51e..f9d515edee1 100644
1989+
index 8e88049f51e..20d7ef7b1bc 100644
19861990
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19871991
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
19881992
@@ -1095,7 +1095,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
@@ -2053,9 +2057,9 @@ index 8e88049f51e..f9d515edee1 100644
20532057
val schema = StructType(Seq(
20542058
StructField("a", IntegerType, nullable = false)
20552059
))
2056-
@@ -1949,11 +1965,24 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2060+
@@ -1949,11 +1963,24 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20572061
""".stripMargin)
2058-
2062+
20592063
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
20602064
- val e = intercept[SparkException] {
20612065
+ // Spark native readers wrap the error in SparkException(FAILED_READ_FILE).
@@ -2081,7 +2085,7 @@ index 8e88049f51e..f9d515edee1 100644
20812085
}
20822086

20832087
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
2084-
@@ -1984,7 +2013,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2088+
@@ -1984,7 +2011,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20852089
}
20862090
}
20872091

@@ -2091,7 +2095,7 @@ index 8e88049f51e..f9d515edee1 100644
20912095
// block 1:
20922096
// null count min max
20932097
// page-0 0 0 99
2094-
@@ -2044,7 +2074,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2098+
@@ -2044,7 +2072,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20952099
}
20962100
}
20972101

@@ -2101,7 +2105,7 @@ index 8e88049f51e..f9d515edee1 100644
21012105
withTempPath { dir =>
21022106
val path = dir.getCanonicalPath
21032107
spark.range(100).selectExpr("id * 2 AS id")
2104-
@@ -2276,7 +2307,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
2108+
@@ -2276,7 +2305,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
21052109
assert(pushedParquetFilters.exists(_.getClass === filterClass),
21062110
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
21072111

@@ -2114,7 +2118,7 @@ index 8e88049f51e..f9d515edee1 100644
21142118
} else {
21152119
assert(selectedFilters.isEmpty, "There is filter pushed down")
21162120
}
2117-
@@ -2336,7 +2371,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
2121+
@@ -2336,7 +2369,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
21182122
assert(pushedParquetFilters.exists(_.getClass === filterClass),
21192123
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
21202124

@@ -2260,34 +2264,18 @@ index 4f906411345..6cc69f7e915 100644
22602264

22612265
import testImplicits._
22622266
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2263-
index 27c2a2148fd..df04a15fb1f 100644
2267+
index 27c2a2148fd..808baf9e778 100644
22642268
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
22652269
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
2266-
@@ -20,12 +20,14 @@ import java.io.File
2267-
2268-
import scala.collection.JavaConverters._
2269-
2270-
+import org.apache.comet.CometConf
2271-
import org.apache.hadoop.fs.Path
2272-
import org.apache.parquet.column.ParquetProperties._
2273-
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
2270+
@@ -26,6 +26,7 @@ import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat}
22742271
import org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE
22752272

22762273
import org.apache.spark.sql.QueryTest
2277-
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
2274+
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec}
22782275
import org.apache.spark.sql.execution.FileSourceScanExec
22792276
import org.apache.spark.sql.execution.datasources.FileFormat
22802277
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
2281-
@@ -172,6 +174,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2282-
2283-
private def testRowIndexGeneration(label: String, conf: RowIndexTestConf): Unit = {
2284-
test (s"$label - ${conf.desc}") {
2285-
+ // native_datafusion Parquet scan does not support row index generation.
2286-
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2287-
withSQLConf(conf.sqlConfs: _*) {
2288-
withTempPath { path =>
2289-
// Read row index using _metadata.row_index if that is supported by the file format.
2290-
@@ -243,6 +247,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2278+
@@ -243,6 +244,17 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
22912279
case f: FileSourceScanExec =>
22922280
numPartitions += f.inputRDD.partitions.length
22932281
numOutputRows += f.metrics("numOutputRows").value
@@ -2297,18 +2285,14 @@ index 27c2a2148fd..df04a15fb1f 100644
22972285
+ case b: CometBatchScanExec =>
22982286
+ numPartitions += b.inputRDD.partitions.length
22992287
+ numOutputRows += b.metrics("numOutputRows").value
2288+
+ case b: CometNativeScanExec =>
2289+
+ numPartitions +=
2290+
+ b.originalPlan.inputRDD.partitions.length
2291+
+ numOutputRows +=
2292+
+ b.metrics("numOutputRows").value
23002293
case _ =>
23012294
}
23022295
assert(numPartitions > 0)
2303-
@@ -301,6 +311,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
2304-
val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
2305-
2306-
test(s"invalid row index column type - ${conf.desc}") {
2307-
+ // native_datafusion Parquet scan does not support row index generation.
2308-
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
2309-
withSQLConf(conf.sqlConfs: _*) {
2310-
withTempPath{ path =>
2311-
val df = spark.range(0, 10, 1, 1).toDF("id")
23122296
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
23132297
index 5c0b7def039..151184bc98c 100644
23142298
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala

dev/diffs/4.0.1.diff

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -246,12 +246,12 @@ index aa3d02dc2fb..c4f878d9908 100644
246246
WITH t(c1) AS (SELECT replace(listagg(DISTINCT col1 COLLATE unicode_rtrim) COLLATE utf8_binary, ' ', '') FROM (VALUES ('xbc '), ('xbc '), ('a'), ('xbc'))) SELECT len(c1), regexp_count(c1, 'a'), regexp_count(c1, 'xbc') FROM t;
247247
WITH t(c1) AS (SELECT listagg(col1) WITHIN GROUP (ORDER BY col1 COLLATE unicode_rtrim) FROM (VALUES ('abc '), ('abc\n'), ('abc'), ('x'))) SELECT replace(replace(c1, ' ', ''), '\n', '$') FROM t;
248248
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
249-
index 0000000..0000000 100644
249+
index 41fd4de2a09..162d5a817b6 100644
250250
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
251251
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/aggregates_part3.sql
252252
@@ -6,6 +6,10 @@
253253
-- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/aggregates.sql#L352-L605
254-
254+
255255
-- Test aggregate operator with codegen on and off.
256256
+
257257
+-- Floating-point precision difference between DataFusion and JVM for FILTER aggregates
@@ -3060,7 +3060,7 @@ index 30503af0fab..1491f4bc2d5 100644
30603060

30613061
import testImplicits._
30623062
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
3063-
index 08fd8a9ecb5..24baf360234 100644
3063+
index 08fd8a9ecb5..d25a2f75773 100644
30643064
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
30653065
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowIndexSuite.scala
30663066
@@ -20,6 +20,7 @@ import java.io.File
@@ -3075,11 +3075,11 @@ index 08fd8a9ecb5..24baf360234 100644
30753075

30763076
import org.apache.spark.SparkException
30773077
import org.apache.spark.sql.QueryTest
3078-
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometScanExec}
3078+
+import org.apache.spark.sql.comet.{CometBatchScanExec, CometNativeScanExec, CometScanExec}
30793079
import org.apache.spark.sql.execution.FileSourceScanExec
30803080
import org.apache.spark.sql.execution.datasources.FileFormat
30813081
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
3082-
@@ -172,8 +174,31 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
3082+
@@ -172,8 +174,29 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
30833083
testRowIndexGeneration("row index generation", conf)
30843084
}
30853085

@@ -3106,12 +3106,10 @@ index 08fd8a9ecb5..24baf360234 100644
31063106
+ assume(!shouldSkip(conf), s"TODO: https://github.com/apache/datafusion-comet/issues/1948 " +
31073107
+ s"Skipping failing config: ${conf.desc}")
31083108
+
3109-
+ // native_datafusion Parquet scan does not support row index generation.
3110-
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
31113109
withSQLConf(conf.sqlConfs: _*) {
31123110
withTempPath { path =>
31133111
// Read row index using _metadata.row_index if that is supported by the file format.
3114-
@@ -245,6 +270,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
3112+
@@ -245,6 +268,17 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
31153113
case f: FileSourceScanExec =>
31163114
numPartitions += f.inputRDD.partitions.length
31173115
numOutputRows += f.metrics("numOutputRows").value
@@ -3121,15 +3119,24 @@ index 08fd8a9ecb5..24baf360234 100644
31213119
+ case b: CometBatchScanExec =>
31223120
+ numPartitions += b.inputRDD.partitions.length
31233121
+ numOutputRows += b.metrics("numOutputRows").value
3122+
+ case b: CometNativeScanExec =>
3123+
+ numPartitions +=
3124+
+ b.originalPlan.inputRDD.partitions.length
3125+
+ numOutputRows +=
3126+
+ b.metrics("numOutputRows").value
31243127
case _ =>
31253128
}
31263129
assert(numPartitions > 0)
3127-
@@ -303,6 +334,8 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
3130+
@@ -303,6 +337,12 @@ class ParquetRowIndexSuite extends QueryTest with SharedSparkSession {
31283131
val conf = RowIndexTestConf(useDataSourceV2 = useDataSourceV2)
31293132

31303133
test(s"invalid row index column type - ${conf.desc}") {
3131-
+ // native_datafusion Parquet scan does not support row index generation.
3132-
+ assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
3134+
+ // https://github.com/apache/datafusion-comet/issues/3886
3135+
+ // Comet throws RuntimeException instead of SparkException
3136+
+ assume(!Seq(
3137+
+ CometConf.SCAN_NATIVE_DATAFUSION,
3138+
+ CometConf.SCAN_AUTO
3139+
+ ).contains(CometConf.COMET_NATIVE_SCAN_IMPL.get()))
31333140
withSQLConf(conf.sqlConfs: _*) {
31343141
withTempPath{ path =>
31353142
val df = spark.range(0, 10, 1, 1).toDF("id")

0 commit comments

Comments
 (0)