Skip to content

Commit b52b2b7

Browse files
author
Stanley Yao
committed
[SPARK-55978][SQL] Address review comments for TABLESAMPLE SYSTEM
Summary: CONTEXT: Review feedback on PR apache#54972 for TABLESAMPLE SYSTEM block sampling. WHAT: - Add DSv2 SampleMethod Java enum to replace boolean isSystemSampling parameter - Update SupportsPushDownTableSample Javadoc to clarify BERNOULLI vs SYSTEM - Replace _LEGACY_ERROR_TEMP_0035 with UNSUPPORTED_FEATURE.TABLESAMPLE_SYSTEM - Restrict SYSTEM sample pushdown to direct table scans only (no subqueries/views) - Remove findScanBuilderHolder that incorrectly traversed through Filter/Project Test Plan: Existing tests. Error class added to error-conditions.json.
1 parent c14e63f commit b52b2b7

6 files changed

Lines changed: 70 additions & 46 deletions

File tree

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8091,6 +8091,11 @@
80918091
"message" : [
80928092
"The target table is <table>."
80938093
]
8094+
},
8095+
"TABLESAMPLE_SYSTEM" : {
8096+
"message" : [
8097+
"TABLESAMPLE SYSTEM (block sampling) requires a DSv2 data source that supports sample pushdown (SupportsPushDownTableSample)."
8098+
]
80948099
}
80958100
},
80968101
"sqlState" : "42902"
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.read;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
22+
/**
23+
* The sampling method for TABLESAMPLE.
24+
*
25+
* @since 4.1.0
26+
*/
27+
@Evolving
28+
public enum SampleMethod {
29+
/** Row-level sampling (BERNOULLI). Each row is independently selected. */
30+
BERNOULLI,
31+
/** Block-level sampling (SYSTEM). Entire partitions/splits are included or skipped. */
32+
SYSTEM
33+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownTableSample.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
public interface SupportsPushDownTableSample extends ScanBuilder {
3030

3131
/**
32-
* Pushes down SAMPLE to the data source.
32+
* Pushes down BERNOULLI (row-level) SAMPLE to the data source.
3333
*/
3434
boolean pushTableSample(
3535
double lowerBound,
@@ -38,18 +38,15 @@ boolean pushTableSample(
3838
long seed);
3939

4040
/**
41-
* Pushes down SAMPLE to the data source with sample method awareness.
42-
* Data sources can override this to distinguish SYSTEM (block) from BERNOULLI (row) sampling.
43-
* By default, rejects SYSTEM sampling for backward compatibility and delegates BERNOULLI to
44-
* the 4-parameter version.
41+
* Pushes down SAMPLE to the data source with the specified sampling method.
4542
*/
4643
default boolean pushTableSample(
4744
double lowerBound,
4845
double upperBound,
4946
boolean withReplacement,
5047
long seed,
51-
boolean isSystemSampling) {
52-
if (isSystemSampling) {
48+
SampleMethod sampleMethod) {
49+
if (sampleMethod == SampleMethod.SYSTEM) {
5350
// If the data source hasn't overridden this method, it must have not added support
5451
// for SYSTEM sampling. Don't apply sample pushdown.
5552
return false;

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,10 +1046,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
10461046
// not able to support block/split sampling. Now it fell through to the row-based
10471047
// sampling. Therefore error out.
10481048
throw new AnalysisException(
1049-
errorClass = "_LEGACY_ERROR_TEMP_0035",
1050-
messageParameters = Map("message" ->
1051-
("TABLESAMPLE SYSTEM (block sampling) must be pushed down to a DSv2 data source. " +
1052-
"It cannot be executed as row-level sampling.")))
1049+
errorClass = "UNSUPPORTED_FEATURE.TABLESAMPLE_SYSTEM",
1050+
messageParameters = Map.empty)
10531051
}
10541052
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
10551053
case logical.LocalRelation(output, data, _, stream) =>

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
2828
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
2929
import org.apache.spark.sql.connector.expressions.{IdentityTransform, SortOrder}
3030
import org.apache.spark.sql.connector.expressions.filter.Predicate
31-
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownOffset, SupportsPushDownRequiredColumns, SupportsPushDownTableSample, SupportsPushDownTopN, SupportsPushDownV2Filters}
31+
import org.apache.spark.sql.connector.read.{SampleMethod => SampleMethodV2, Scan, ScanBuilder, SupportsPushDownFilters, SupportsPushDownLimit, SupportsPushDownOffset, SupportsPushDownRequiredColumns, SupportsPushDownTableSample, SupportsPushDownTopN, SupportsPushDownV2Filters}
3232
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, DataSourceUtils}
3333
import org.apache.spark.sql.internal.SQLConf
3434
import org.apache.spark.sql.internal.connector.{PartitionPredicateField, PartitionPredicateImpl, SupportsPushDownCatalystFilters}
@@ -298,7 +298,10 @@ object PushDownUtils extends Logging {
298298
case s: SupportsPushDownTableSample =>
299299
s.pushTableSample(
300300
sample.lowerBound, sample.upperBound, sample.withReplacement, sample.seed,
301-
sample.sampleMethod == SampleMethod.System)
301+
sample.sampleMethod match {
302+
case SampleMethod.Bernoulli => SampleMethodV2.BERNOULLI
303+
case SampleMethod.System => SampleMethodV2.SYSTEM
304+
})
302305
case _ => false
303306
}
304307
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala

Lines changed: 21 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -812,42 +812,30 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
812812
}
813813
}
814814

815-
private def findScanBuilderHolder(plan: LogicalPlan): Option[ScanBuilderHolder] = {
816-
plan match {
817-
case s: ScanBuilderHolder => Some(s)
818-
case s: SubqueryAlias => findScanBuilderHolder(s.child)
819-
case p: Project => findScanBuilderHolder(p.child)
820-
case f: Filter => findScanBuilderHolder(f.child)
821-
case _ => None
822-
}
823-
}
824-
825815
def pushDownSample(plan: LogicalPlan): LogicalPlan = plan.transform {
826816
case sample: Sample if sample.sampleMethod == SampleMethod.System =>
827-
findScanBuilderHolder(sample.child) match {
828-
case Some(sHolder) =>
829-
val tableSample = TableSampleInfo(
830-
sample.lowerBound,
831-
sample.upperBound,
832-
sample.withReplacement,
833-
sample.seed,
834-
sampleMethod = sample.sampleMethod)
835-
val pushed = PushDownUtils.pushTableSample(sHolder.builder, tableSample)
836-
if (pushed) {
837-
sHolder.pushedSample = Some(tableSample)
838-
sample.child
839-
} else {
840-
throw new AnalysisException(
841-
errorClass = "_LEGACY_ERROR_TEMP_0035",
842-
messageParameters = Map("message" ->
843-
("TABLESAMPLE SYSTEM requires a data source that supports " +
844-
"table sample pushdown (SupportsPushDownTableSample).")))
845-
}
846-
case None =>
817+
val sHolder = sample.child match {
818+
case s: ScanBuilderHolder => s
819+
case SubqueryAlias(_, s: ScanBuilderHolder) => s
820+
case _ =>
847821
throw new AnalysisException(
848-
errorClass = "_LEGACY_ERROR_TEMP_0035",
849-
messageParameters = Map("message" ->
850-
"TABLESAMPLE SYSTEM is only supported for DSv2 data source scan relations."))
822+
errorClass = "UNSUPPORTED_FEATURE.TABLESAMPLE_SYSTEM",
823+
messageParameters = Map.empty)
824+
}
825+
val tableSample = TableSampleInfo(
826+
sample.lowerBound,
827+
sample.upperBound,
828+
sample.withReplacement,
829+
sample.seed,
830+
sampleMethod = sample.sampleMethod)
831+
val pushed = PushDownUtils.pushTableSample(sHolder.builder, tableSample)
832+
if (pushed) {
833+
sHolder.pushedSample = Some(tableSample)
834+
sample.child
835+
} else {
836+
throw new AnalysisException(
837+
errorClass = "UNSUPPORTED_FEATURE.TABLESAMPLE_SYSTEM",
838+
messageParameters = Map.empty)
851839
}
852840

853841
case sample: Sample => sample.child match {

0 commit comments

Comments
 (0)