Skip to content

Commit a2170d4

Browse files
author
stanyao
committed
[SPARK-55978][SQL] Add TABLESAMPLE SYSTEM block sampling with DSv2 pushdown
This PR adds support for ANSI SQL `TABLESAMPLE SYSTEM` (block-level sampling) alongside the existing `TABLESAMPLE BERNOULLI` (row-level sampling). **SQL grammar**: Extended `TABLESAMPLE` to accept an optional `SYSTEM` or `BERNOULLI` qualifier before the sample method. Added both as non-reserved keywords. `TABLESAMPLE SYSTEM` only supports `PERCENT` sampling and does not support `REPEATABLE`. **Logical plan**: Introduced `SampleMethod` sealed trait (`Bernoulli`/`System`) and added it to the `Sample` node. Default is `Bernoulli` for backward compatibility. **DSv2 pushdown**: Added `SampleMethod` Java enum and extended `SupportsPushDownTableSample.pushTableSample()` with a new overload. Sources that don't override the new method reject SYSTEM sampling by default. SYSTEM pushdown is restricted to direct table scans via `PhysicalOperation`. **Physical planning**: SYSTEM samples that aren't pushed down to a DSv2 source raise an `AnalysisException` -- there is no row-level fallback since block sampling is data-source dependent.
1 parent 6bba551 commit a2170d4

24 files changed

Lines changed: 336 additions & 24 deletions

File tree

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8097,6 +8097,11 @@
80978097
"message" : [
80988098
"The target table is <table>."
80998099
]
8100+
},
8101+
"TABLESAMPLE_SYSTEM" : {
8102+
"message" : [
8103+
"TABLESAMPLE SYSTEM is only supported by data sources that implement block-level sampling."
8104+
]
81008105
}
81018106
},
81028107
"sqlState" : "42902"

docs/sql-ref-ansi-compliance.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ Below is a list of all the keywords in Spark SQL.
429429
|ATOMIC|non-reserved|non-reserved|non-reserved|
430430
|AUTHORIZATION|reserved|non-reserved|reserved|
431431
|BEGIN|non-reserved|non-reserved|non-reserved|
432+
|BERNOULLI|non-reserved|non-reserved|non-reserved|
432433
|BETWEEN|non-reserved|non-reserved|reserved|
433434
|BIGINT|non-reserved|non-reserved|reserved|
434435
|BINARY|non-reserved|non-reserved|reserved|
@@ -754,6 +755,7 @@ Below is a list of all the keywords in Spark SQL.
754755
|SUBSTR|non-reserved|non-reserved|non-reserved|
755756
|SUBSTRING|non-reserved|non-reserved|non-reserved|
756757
|SYNC|non-reserved|non-reserved|non-reserved|
758+
|SYSTEM|non-reserved|non-reserved|reserved|
757759
|SYSTEM_TIME|non-reserved|non-reserved|non-reserved|
758760
|SYSTEM_VERSION|non-reserved|non-reserved|non-reserved|
759761
|TABLE|reserved|non-reserved|reserved|

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ AT: 'AT';
148148
ATOMIC: 'ATOMIC';
149149
AUTHORIZATION: 'AUTHORIZATION';
150150
BEGIN: 'BEGIN';
151+
BERNOULLI: 'BERNOULLI';
151152
BETWEEN: 'BETWEEN';
152153
BIGINT: 'BIGINT';
153154
BINARY: 'BINARY';
@@ -472,6 +473,7 @@ STRUCT: 'STRUCT' {incComplexTypeLevelCounter();};
472473
SUBSTR: 'SUBSTR';
473474
SUBSTRING: 'SUBSTRING';
474475
SYNC: 'SYNC';
476+
SYSTEM: 'SYSTEM';
475477
SYSTEM_TIME: 'SYSTEM_TIME';
476478
SYSTEM_VERSION: 'SYSTEM_VERSION';
477479
TABLE: 'TABLE';

sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1053,7 +1053,9 @@ joinCriteria
10531053
;
10541054

10551055
sample
1056-
: TABLESAMPLE LEFT_PAREN sampleMethod? RIGHT_PAREN (REPEATABLE LEFT_PAREN seed=integerValue RIGHT_PAREN)?
1056+
: TABLESAMPLE (sampleType=(SYSTEM | BERNOULLI))?
1057+
LEFT_PAREN sampleMethod? RIGHT_PAREN
1058+
(REPEATABLE LEFT_PAREN seed=integerValue RIGHT_PAREN)?
10571059
;
10581060

10591061
sampleMethod
@@ -1921,6 +1923,7 @@ ansiNonReserved
19211923
| AT
19221924
| ATOMIC
19231925
| BEGIN
1926+
| BERNOULLI
19241927
| BETWEEN
19251928
| BIGINT
19261929
| BINARY
@@ -2187,6 +2190,7 @@ ansiNonReserved
21872190
| SUBSTR
21882191
| SUBSTRING
21892192
| SYNC
2193+
| SYSTEM
21902194
| SYSTEM_TIME
21912195
| SYSTEM_VERSION
21922196
| TABLES
@@ -2291,6 +2295,7 @@ nonReserved
22912295
| ATOMIC
22922296
| AUTHORIZATION
22932297
| BEGIN
2298+
| BERNOULLI
22942299
| BETWEEN
22952300
| BIGINT
22962301
| BINARY
@@ -2604,6 +2609,7 @@ nonReserved
26042609
| SUBSTR
26052610
| SUBSTRING
26062611
| SYNC
2612+
| SYSTEM
26072613
| SYSTEM_TIME
26082614
| SYSTEM_VERSION
26092615
| TABLE
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.read;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
22+
/**
23+
* The sampling method for TABLESAMPLE.
24+
*
25+
* @since 4.1.0
26+
*/
27+
@Evolving
28+
public enum SampleMethod {
29+
/** Row-level sampling (BERNOULLI). Each row is independently selected. */
30+
BERNOULLI,
31+
/** Block-level sampling (SYSTEM). Entire partitions/splits are included or skipped. */
32+
SYSTEM
33+
}

sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsPushDownTableSample.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,28 @@
2929
public interface SupportsPushDownTableSample extends ScanBuilder {
3030

3131
/**
32-
* Pushes down SAMPLE to the data source.
32+
* Pushes down BERNOULLI (row-level) SAMPLE to the data source.
3333
*/
3434
boolean pushTableSample(
3535
double lowerBound,
3636
double upperBound,
3737
boolean withReplacement,
3838
long seed);
39+
40+
/**
41+
* Pushes down SAMPLE to the data source with the specified sampling method.
42+
*/
43+
default boolean pushTableSample(
44+
double lowerBound,
45+
double upperBound,
46+
boolean withReplacement,
47+
long seed,
48+
SampleMethod sampleMethod) {
49+
if (sampleMethod == SampleMethod.SYSTEM) {
50+
// If the data source hasn't overridden this method, it must have not added support
51+
// for SYSTEM sampling. Don't apply sample pushdown.
52+
return false;
53+
}
54+
return pushTableSample(lowerBound, upperBound, withReplacement, seed);
55+
}
3956
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ object UnsupportedOperationChecker extends Logging {
530530
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " +
531531
"aggregated DataFrame/Dataset in Complete output mode")
532532

533-
case Sample(_, _, _, _, child) if child.isStreaming =>
533+
case Sample(_, _, _, _, child, _) if child.isStreaming =>
534534
throwError("Sampling is not supported on streaming DataFrames/Datasets")
535535

536536
case Window(windowExpression, _, _, child, _) if child.isStreaming =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1292,7 +1292,7 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper {
12921292
limit.copy(child = p2.copy(projectList = newProjectList))
12931293
case Project(l1, r @ Repartition(_, _, p @ Project(l2, _))) if isRenaming(l1, l2) =>
12941294
r.copy(child = p.copy(projectList = buildCleanedProjectList(l1, p.projectList)))
1295-
case Project(l1, s @ Sample(_, _, _, _, p2 @ Project(l2, _))) if isRenaming(l1, l2) =>
1295+
case Project(l1, s @ Sample(_, _, _, _, p2 @ Project(l2, _), _)) if isRenaming(l1, l2) =>
12961296
s.copy(child = p2.copy(projectList = buildCleanedProjectList(l1, p2.projectList)))
12971297
case o => o
12981298
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2381,10 +2381,14 @@ class AstBuilder extends DataTypeAstBuilder
23812381
* - TABLESAMPLE(x ROWS): Sample the table down to the given number of rows.
23822382
* - TABLESAMPLE(x PERCENT) [REPEATABLE (y)]: Sample the table down to the given percentage with
23832383
* seed 'y'. Note that percentages are defined as a number between 0 and 100.
2384+
* - TABLESAMPLE SYSTEM(x PERCENT): Sample by data source dependent blocks or file splits.
23842385
* - TABLESAMPLE(BUCKET x OUT OF y) [REPEATABLE (z)]: Sample the table down to a 'x' divided by
23852386
* 'y' fraction with seed 'z'.
23862387
*/
23872388
private def withSample(ctx: SampleContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
2389+
val isSystem = ctx.sampleType != null &&
2390+
ctx.sampleType.getType == SqlBaseParser.SYSTEM
2391+
23882392
// Create a sampled plan if we need one.
23892393
def sample(fraction: Double, seed: Option[Long]): Sample = {
23902394
// The range of fraction accepted by Sample is [0, 1]. Because Hive's block sampling
@@ -2394,17 +2398,25 @@ class AstBuilder extends DataTypeAstBuilder
23942398
validate(fraction >= 0.0 - eps && fraction <= 1.0 + eps,
23952399
s"Sampling fraction ($fraction) must be on interval [0, 1]",
23962400
ctx)
2397-
Sample(0.0, fraction, withReplacement = false, seed, query)
2401+
val method = if (isSystem) SampleMethod.System else SampleMethod.Bernoulli
2402+
Sample(0.0, fraction, withReplacement = false, seed, query, method)
23982403
}
23992404

24002405
if (ctx.sampleMethod() == null) {
24012406
throw QueryParsingErrors.emptyInputForTableSampleError(ctx)
24022407
}
24032408

2409+
if (isSystem && ctx.seed != null) {
2410+
operationNotAllowed("TABLESAMPLE SYSTEM does not support REPEATABLE", ctx)
2411+
}
2412+
24042413
val seed: Option[Long] = Option(ctx.seed).map(_.getText.toLong)
24052414

24062415
ctx.sampleMethod() match {
24072416
case ctx: SampleByRowsContext =>
2417+
if (isSystem) {
2418+
operationNotAllowed("TABLESAMPLE SYSTEM only supports PERCENT sampling", ctx)
2419+
}
24082420
Limit(expression(ctx.expression), query)
24092421

24102422
case ctx: SampleByPercentileContext =>
@@ -2416,6 +2428,9 @@ class AstBuilder extends DataTypeAstBuilder
24162428
sample(sign * fraction / 100.0d, seed)
24172429

24182430
case ctx: SampleByBytesContext =>
2431+
if (isSystem) {
2432+
operationNotAllowed("TABLESAMPLE SYSTEM only supports PERCENT sampling", ctx)
2433+
}
24192434
val bytesStr = ctx.bytes.getText
24202435
if (bytesStr.matches("[0-9]+[bBkKmMgG]")) {
24212436
throw QueryParsingErrors.tableSampleByBytesUnsupportedError("byteLengthLiteral", ctx)
@@ -2424,6 +2439,9 @@ class AstBuilder extends DataTypeAstBuilder
24242439
}
24252440

24262441
case ctx: SampleByBucketContext if ctx.ON() != null =>
2442+
if (isSystem) {
2443+
operationNotAllowed("TABLESAMPLE SYSTEM only supports PERCENT sampling", ctx)
2444+
}
24272445
if (ctx.identifier != null) {
24282446
throw QueryParsingErrors.tableSampleByBytesUnsupportedError(
24292447
"BUCKET x OUT OF y ON colname", ctx)
@@ -2433,6 +2451,9 @@ class AstBuilder extends DataTypeAstBuilder
24332451
}
24342452

24352453
case ctx: SampleByBucketContext =>
2454+
if (isSystem) {
2455+
operationNotAllowed("TABLESAMPLE SYSTEM only supports PERCENT sampling", ctx)
2456+
}
24362457
sample(ctx.numerator.getText.toDouble / ctx.denominator.getText.toDouble, seed)
24372458
}
24382459
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1912,6 +1912,14 @@ object SubqueryAlias {
19121912
}
19131913
}
19141914

1915+
sealed trait SampleMethod extends Serializable
1916+
object SampleMethod {
1917+
/** Row-level sampling (BERNOULLI). Each row independently selected. No I/O savings. */
1918+
case object Bernoulli extends SampleMethod
1919+
/** System-level sampling (SYSTEM). Entire partitions/splits included or skipped. */
1920+
case object System extends SampleMethod
1921+
}
1922+
19151923
object Sample {
19161924
/**
19171925
* Convenience constructor that wraps a concrete seed in [[Some]].
@@ -1926,6 +1934,16 @@ object Sample {
19261934
child: LogicalPlan): Sample = {
19271935
new Sample(lowerBound, upperBound, withReplacement, Some(seed), child)
19281936
}
1937+
1938+
def apply(
1939+
lowerBound: Double,
1940+
upperBound: Double,
1941+
withReplacement: Boolean,
1942+
seed: Long,
1943+
child: LogicalPlan,
1944+
sampleMethod: SampleMethod): Sample = {
1945+
new Sample(lowerBound, upperBound, withReplacement, Some(seed), child, sampleMethod)
1946+
}
19291947
}
19301948

19311949
/**
@@ -1939,13 +1957,15 @@ object Sample {
19391957
* (SQL `REPEATABLE` clause or programmatic API), `None` when no seed was
19401958
* specified and a random seed should be generated at execution time.
19411959
* @param child the LogicalPlan
1960+
* @param sampleMethod the sampling method (Bernoulli or System)
19421961
*/
19431962
case class Sample(
19441963
lowerBound: Double,
19451964
upperBound: Double,
19461965
withReplacement: Boolean,
19471966
seed: Option[Long],
1948-
child: LogicalPlan) extends UnaryNode {
1967+
child: LogicalPlan,
1968+
sampleMethod: SampleMethod = SampleMethod.Bernoulli) extends UnaryNode {
19491969

19501970
val eps = RandomSampler.roundingEpsilon
19511971
val fraction = upperBound - lowerBound

0 commit comments

Comments
 (0)