Skip to content

Commit 7ccd67b

Browse files
Merge remote-tracking branch 'upstream/main' into disable-atan2
2 parents 1c0d56f + 7878f0d commit 7ccd67b

50 files changed

Lines changed: 1741 additions & 464 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
name: Label new issues with requires-triage
19+
20+
on:
21+
issues:
22+
types: [opened]
23+
24+
permissions:
25+
issues: write
26+
27+
jobs:
28+
add-triage-label:
29+
runs-on: ubuntu-latest
30+
steps:
31+
- uses: actions/github-script@v7
32+
with:
33+
script: |
34+
await github.rest.issues.addLabels({
35+
owner: context.repo.owner,
36+
repo: context.repo.repo,
37+
issue_number: context.issue.number,
38+
labels: ['requires-triage']
39+
})

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ jobs:
384384
- name: Java test steps
385385
uses: ./.github/actions/java-test
386386
with:
387-
artifact_name: ${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }}
387+
artifact_name: ${{ matrix.profile.name }}-${{ matrix.suite.name }}-${{ github.run_id }}-${{ github.run_number }}-${{ github.run_attempt }}-${{ matrix.profile.scan_impl }}
388388
suites: ${{ matrix.suite.name == 'sql' && matrix.profile.name == 'Spark 3.4, JDK 11, Scala 2.12' && '' || matrix.suite.value }}
389389
maven_opts: ${{ matrix.profile.maven_opts }}
390390
scan_impl: ${{ matrix.profile.scan_impl }}

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,20 @@ object CometConf extends ShimCometConf {
798798
.longConf
799799
.createWithDefault(3000L)
800800

801+
val COMET_METRICS_ENABLED: ConfigEntry[Boolean] =
802+
conf("spark.comet.metrics.enabled")
803+
.category(CATEGORY_EXEC)
804+
.doc(
805+
"Whether to enable Comet metrics reporting through Spark's external monitoring system. " +
806+
"When enabled, Comet exposes metrics such as native operators, Spark operators, " +
807+
"queries planned, transitions, and acceleration ratio. These metrics can be " +
808+
"visualized through tools like Grafana when a metrics sink (e.g., Prometheus) is " +
809+
"configured. Disabled by default because Spark plan traversal adds overhead and " +
810+
"metrics require a sink to be useful. " +
811+
"This config must be set before the SparkSession is created to take effect.")
812+
.booleanConf
813+
.createWithDefault(false)
814+
801815
val COMET_LIBHDFS_SCHEMES_KEY = "fs.comet.libhdfs.schemes"
802816

803817
val COMET_LIBHDFS_SCHEMES: OptionalConfigEntry[String] =

dev/diffs/3.4.3.diff

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ index a6b295578d6..91acca4306f 100644
523523

524524
test("SPARK-35884: Explain Formatted") {
525525
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
526-
index 2796b1cf154..d628f44e4ee 100644
526+
index 2796b1cf154..53dcfde932e 100644
527527
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
528528
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
529529
@@ -33,6 +33,7 @@ import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
@@ -534,41 +534,70 @@ index 2796b1cf154..d628f44e4ee 100644
534534
import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
535535
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
536536
import org.apache.spark.sql.execution.datasources.FilePartition
537-
@@ -499,7 +500,8 @@ class FileBasedDataSourceSuite extends QueryTest
538-
}
537+
@@ -516,21 +517,24 @@ class FileBasedDataSourceSuite extends QueryTest
538+
checkAnswer(sql(s"select A from $tableName"), data.select("A"))
539+
540+
// RuntimeException is triggered at executor side, which is then wrapped as
541+
- // SparkException at driver side
542+
- val e1 = intercept[SparkException] {
543+
- sql(s"select b from $tableName").collect()
544+
+ // SparkException at driver side. Comet native readers throw RuntimeException
545+
+ // directly without the SparkException wrapper.
546+
+ def getDuplicateFieldError(query: String): RuntimeException = {
547+
+ try {
548+
+ sql(query).collect()
549+
+ fail("Expected an exception").asInstanceOf[RuntimeException]
550+
+ } catch {
551+
+ case e: SparkException =>
552+
+ e.getCause.asInstanceOf[RuntimeException]
553+
+ case e: RuntimeException => e
554+
+ }
555+
}
556+
- assert(
557+
- e1.getCause.isInstanceOf[RuntimeException] &&
558+
- e1.getCause.getMessage.contains(
559+
- """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
560+
- val e2 = intercept[SparkException] {
561+
- sql(s"select B from $tableName").collect()
562+
- }
563+
- assert(
564+
- e2.getCause.isInstanceOf[RuntimeException] &&
565+
- e2.getCause.getMessage.contains(
566+
- """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
567+
+ val e1 = getDuplicateFieldError(s"select b from $tableName")
568+
+ assert(e1.getMessage.contains(
569+
+ """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
570+
+ val e2 = getDuplicateFieldError(s"select B from $tableName")
571+
+ assert(e2.getMessage.contains(
572+
+ """Found duplicate field(s) "b": [b, B] in case-insensitive mode"""))
573+
}
539574

540-
Seq("parquet", "orc").foreach { format =>
541-
- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") {
542-
+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}",
543-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) {
544-
withTempDir { dir =>
545-
val tableName = s"spark_25132_${format}_native"
546-
val tableDir = dir.getCanonicalPath + s"/$tableName"
547-
@@ -815,6 +817,7 @@ class FileBasedDataSourceSuite extends QueryTest
575+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
576+
@@ -815,6 +819,7 @@ class FileBasedDataSourceSuite extends QueryTest
548577
assert(bJoinExec.isEmpty)
549578
val smJoinExec = collect(joinedDF.queryExecution.executedPlan) {
550579
case smJoin: SortMergeJoinExec => smJoin
551580
+ case smJoin: CometSortMergeJoinExec => smJoin
552581
}
553582
assert(smJoinExec.nonEmpty)
554583
}
555-
@@ -875,6 +878,7 @@ class FileBasedDataSourceSuite extends QueryTest
584+
@@ -875,6 +880,7 @@ class FileBasedDataSourceSuite extends QueryTest
556585

557586
val fileScan = df.queryExecution.executedPlan collectFirst {
558587
case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
559588
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _), _, _) => f
560589
}
561590
assert(fileScan.nonEmpty)
562591
assert(fileScan.get.partitionFilters.nonEmpty)
563-
@@ -916,6 +920,7 @@ class FileBasedDataSourceSuite extends QueryTest
592+
@@ -916,6 +922,7 @@ class FileBasedDataSourceSuite extends QueryTest
564593

565594
val fileScan = df.queryExecution.executedPlan collectFirst {
566595
case BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _) => f
567596
+ case CometBatchScanExec(BatchScanExec(_, f: FileScan, _, _, _, _, _, _, _), _, _) => f
568597
}
569598
assert(fileScan.nonEmpty)
570599
assert(fileScan.get.partitionFilters.isEmpty)
571-
@@ -1100,6 +1105,9 @@ class FileBasedDataSourceSuite extends QueryTest
600+
@@ -1100,6 +1107,9 @@ class FileBasedDataSourceSuite extends QueryTest
572601
val filters = df.queryExecution.executedPlan.collect {
573602
case f: FileSourceScanLike => f.dataFilters
574603
case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
@@ -2003,7 +2032,7 @@ index 07e2849ce6f..3e73645b638 100644
20032032
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
20042033
)
20052034
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
2006-
index 104b4e416cd..d865077684f 100644
2035+
index 104b4e416cd..b8af360fa14 100644
20072036
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
20082037
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
20092038
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
@@ -2083,17 +2112,32 @@ index 104b4e416cd..d865077684f 100644
20832112
val schema = StructType(Seq(
20842113
StructField("a", IntegerType, nullable = false)
20852114
))
2086-
@@ -1934,7 +1950,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2087-
}
2088-
}
2115+
@@ -1950,11 +1966,21 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2116+
""".stripMargin)
2117+
2118+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
2119+
- val e = intercept[SparkException] {
2120+
+ // Spark native readers wrap the error in SparkException.
2121+
+ // Comet native readers throw RuntimeException directly.
2122+
+ val msg = try {
2123+
sql(s"select a from $tableName where b > 0").collect()
2124+
+ fail("Expected an exception")
2125+
+ } catch {
2126+
+ case e: SparkException =>
2127+
+ assert(e.getCause.isInstanceOf[RuntimeException])
2128+
+ e.getCause.getMessage
2129+
+ case e: RuntimeException =>
2130+
+ e.getMessage
2131+
}
2132+
- assert(e.getCause.isInstanceOf[RuntimeException] && e.getCause.getMessage.contains(
2133+
- """Found duplicate field(s) "B": [B, b] in case-insensitive mode"""))
2134+
+ assert(msg.contains(
2135+
+ """Found duplicate field(s) "B": [B, b] in case-insensitive mode"""),
2136+
+ s"Unexpected error message: $msg")
2137+
}
20892138

2090-
- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") {
2091-
+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode",
2092-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) {
2093-
withTempPath { dir =>
2094-
val count = 10
2095-
val tableName = "spark_25207"
2096-
@@ -1985,7 +2002,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2139+
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
2140+
@@ -1985,7 +2011,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
20972141
}
20982142
}
20992143

@@ -2103,7 +2147,7 @@ index 104b4e416cd..d865077684f 100644
21032147
// block 1:
21042148
// null count min max
21052149
// page-0 0 0 99
2106-
@@ -2045,7 +2063,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
2150+
@@ -2045,7 +2072,8 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
21072151
}
21082152
}
21092153

@@ -2113,7 +2157,7 @@ index 104b4e416cd..d865077684f 100644
21132157
withTempPath { dir =>
21142158
val path = dir.getCanonicalPath
21152159
spark.range(100).selectExpr("id * 2 AS id")
2116-
@@ -2277,7 +2296,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
2160+
@@ -2277,7 +2305,11 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
21172161
assert(pushedParquetFilters.exists(_.getClass === filterClass),
21182162
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
21192163

@@ -2126,7 +2170,7 @@ index 104b4e416cd..d865077684f 100644
21262170
} else {
21272171
assert(selectedFilters.isEmpty, "There is filter pushed down")
21282172
}
2129-
@@ -2337,7 +2360,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
2173+
@@ -2337,7 +2369,11 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
21302174
assert(pushedParquetFilters.exists(_.getClass === filterClass),
21312175
s"${pushedParquetFilters.map(_.getClass).toList} did not contain ${filterClass}.")
21322176

0 commit comments

Comments
 (0)