Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -8103,6 +8103,16 @@
"message" : [
"The target table is <table>."
]
},
"TABLESAMPLE_SYSTEM" : {
"message" : [
"TABLESAMPLE SYSTEM is only supported by data sources that implement block-level sampling."
]
},
"TABLESAMPLE_SYSTEM_NO_SCAN" : {
"message" : [
"TABLESAMPLE SYSTEM requires a direct reference to a data source table that supports block-level sampling. It cannot be applied to subqueries, views, or tables with intervening operations."
]
}
},
"sqlState" : "42902"
Expand Down
2 changes: 2 additions & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ Below is a list of all the keywords in Spark SQL.
|ATOMIC|non-reserved|non-reserved|non-reserved|
|AUTHORIZATION|reserved|non-reserved|reserved|
|BEGIN|non-reserved|non-reserved|non-reserved|
|BERNOULLI|non-reserved|non-reserved|non-reserved|
|BETWEEN|non-reserved|non-reserved|reserved|
|BIGINT|non-reserved|non-reserved|reserved|
|BINARY|non-reserved|non-reserved|reserved|
Expand Down Expand Up @@ -759,6 +760,7 @@ Below is a list of all the keywords in Spark SQL.
|SUBSTR|non-reserved|non-reserved|non-reserved|
|SUBSTRING|non-reserved|non-reserved|non-reserved|
|SYNC|non-reserved|non-reserved|non-reserved|
|SYSTEM|non-reserved|non-reserved|reserved|
|SYSTEM_PATH|non-reserved|non-reserved|not a keyword|
|SYSTEM_TIME|non-reserved|non-reserved|non-reserved|
|SYSTEM_VERSION|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ AT: 'AT';
ATOMIC: 'ATOMIC';
AUTHORIZATION: 'AUTHORIZATION';
BEGIN: 'BEGIN';
BERNOULLI: 'BERNOULLI';
BETWEEN: 'BETWEEN';
BIGINT: 'BIGINT';
BINARY: 'BINARY';
Expand Down Expand Up @@ -477,6 +478,7 @@ STRUCT: 'STRUCT' {incComplexTypeLevelCounter();};
SUBSTR: 'SUBSTR';
SUBSTRING: 'SUBSTRING';
SYNC: 'SYNC';
SYSTEM: 'SYSTEM';
SYSTEM_TIME: 'SYSTEM_TIME';
SYSTEM_VERSION: 'SYSTEM_VERSION';
SYSTEM_PATH: 'SYSTEM_PATH';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,9 @@ joinCriteria
;

sample
: TABLESAMPLE LEFT_PAREN sampleMethod? RIGHT_PAREN (REPEATABLE LEFT_PAREN seed=integerValue RIGHT_PAREN)?
: TABLESAMPLE (sampleType=(SYSTEM | BERNOULLI))?
LEFT_PAREN sampleMethod? RIGHT_PAREN
(REPEATABLE LEFT_PAREN seed=integerValue RIGHT_PAREN)?
;

sampleMethod
Expand Down Expand Up @@ -1921,6 +1923,7 @@ ansiNonReserved
| AT
| ATOMIC
| BEGIN
| BERNOULLI
| BETWEEN
| BIGINT
| BINARY
Expand Down Expand Up @@ -2190,6 +2193,7 @@ ansiNonReserved
| SUBSTR
| SUBSTRING
| SYNC
| SYSTEM
Comment thread
stanyao marked this conversation as resolved.
| SYSTEM_PATH
| SYSTEM_TIME
| SYSTEM_VERSION
Expand Down Expand Up @@ -2295,6 +2299,7 @@ nonReserved
| ATOMIC
| AUTHORIZATION
| BEGIN
| BERNOULLI
| BETWEEN
| BIGINT
| BINARY
Expand Down Expand Up @@ -2613,6 +2618,7 @@ nonReserved
| SUBSTR
| SUBSTRING
| SYNC
| SYSTEM
| SYSTEM_PATH
| SYSTEM_TIME
| SYSTEM_VERSION
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read;

import org.apache.spark.annotation.Evolving;

/**
* The sampling method for TABLESAMPLE.
*
* @since 4.2.0
*/
@Evolving
public enum SampleMethod {
/** Row-level sampling (BERNOULLI). Each row is independently selected. */
BERNOULLI,
/** Block-level sampling (SYSTEM). Entire partitions/splits are included or skipped. */
SYSTEM
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,28 @@
public interface SupportsPushDownTableSample extends ScanBuilder {

/**
* Pushes down SAMPLE to the data source.
* Pushes down BERNOULLI (row-level) SAMPLE to the data source.
*/
boolean pushTableSample(
double lowerBound,
double upperBound,
boolean withReplacement,
long seed);

/**
* Pushes down SAMPLE to the data source with the specified sampling method.
*/
default boolean pushTableSample(
Comment thread
stanyao marked this conversation as resolved.
double lowerBound,
double upperBound,
boolean withReplacement,
long seed,
SampleMethod sampleMethod) {
if (sampleMethod == SampleMethod.SYSTEM) {
// If the data source hasn't overridden this method, it must have not added support
Comment thread
stanyao marked this conversation as resolved.
// for SYSTEM sampling. Don't apply sample pushdown.
return false;
}
return pushTableSample(lowerBound, upperBound, withReplacement, seed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ object UnsupportedOperationChecker extends Logging {
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " +
"aggregated DataFrame/Dataset in Complete output mode")

case Sample(_, _, _, _, child) if child.isStreaming =>
case Sample(_, _, _, _, child, _) if child.isStreaming =>
throwError("Sampling is not supported on streaming DataFrames/Datasets")

case Window(windowExpression, _, _, child, _) if child.isStreaming =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper {
limit.copy(child = p2.copy(projectList = newProjectList))
case Project(l1, r @ Repartition(_, _, p @ Project(l2, _))) if isRenaming(l1, l2) =>
r.copy(child = p.copy(projectList = buildCleanedProjectList(l1, p.projectList)))
case Project(l1, s @ Sample(_, _, _, _, p2 @ Project(l2, _))) if isRenaming(l1, l2) =>
case Project(l1, s @ Sample(_, _, _, _, p2 @ Project(l2, _), _)) if isRenaming(l1, l2) =>
s.copy(child = p2.copy(projectList = buildCleanedProjectList(l1, p2.projectList)))
case o => o
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2381,10 +2381,14 @@ class AstBuilder extends DataTypeAstBuilder
* - TABLESAMPLE(x ROWS): Sample the table down to the given number of rows.
* - TABLESAMPLE(x PERCENT) [REPEATABLE (y)]: Sample the table down to the given percentage with
* seed 'y'. Note that percentages are defined as a number between 0 and 100.
* - TABLESAMPLE SYSTEM(x PERCENT): Sample by data source dependent blocks or file splits.
* - TABLESAMPLE(BUCKET x OUT OF y) [REPEATABLE (z)]: Sample the table down to a 'x' divided by
* 'y' fraction with seed 'z'.
*/
private def withSample(ctx: SampleContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
val isSystem = ctx.sampleType != null &&
ctx.sampleType.getType == SqlBaseParser.SYSTEM

// Create a sampled plan if we need one.
def sample(fraction: Double, seed: Option[Long]): Sample = {
// The range of fraction accepted by Sample is [0, 1]. Because Hive's block sampling
Expand All @@ -2394,17 +2398,25 @@ class AstBuilder extends DataTypeAstBuilder
validate(fraction >= 0.0 - eps && fraction <= 1.0 + eps,
s"Sampling fraction ($fraction) must be on interval [0, 1]",
ctx)
Sample(0.0, fraction, withReplacement = false, seed, query)
val method = if (isSystem) SampleMethod.System else SampleMethod.Bernoulli
Sample(0.0, fraction, withReplacement = false, seed, query, method)
}

if (ctx.sampleMethod() == null) {
throw QueryParsingErrors.emptyInputForTableSampleError(ctx)
}

if (isSystem && ctx.seed != null) {
operationNotAllowed("TABLESAMPLE SYSTEM does not support REPEATABLE", ctx)
}

val seed: Option[Long] = Option(ctx.seed).map(_.getText.toLong)

ctx.sampleMethod() match {
case ctx: SampleByRowsContext =>
if (isSystem) {
operationNotAllowed("TABLESAMPLE SYSTEM only supports PERCENT sampling", ctx)
}
Limit(expression(ctx.expression), query)

case ctx: SampleByPercentileContext =>
Expand All @@ -2416,6 +2428,9 @@ class AstBuilder extends DataTypeAstBuilder
sample(sign * fraction / 100.0d, seed)

case ctx: SampleByBytesContext =>
if (isSystem) {
operationNotAllowed("TABLESAMPLE SYSTEM only supports PERCENT sampling", ctx)
}
val bytesStr = ctx.bytes.getText
if (bytesStr.matches("[0-9]+[bBkKmMgG]")) {
throw QueryParsingErrors.tableSampleByBytesUnsupportedError("byteLengthLiteral", ctx)
Expand All @@ -2424,6 +2439,9 @@ class AstBuilder extends DataTypeAstBuilder
}

case ctx: SampleByBucketContext if ctx.ON() != null =>
if (isSystem) {
operationNotAllowed("TABLESAMPLE SYSTEM only supports PERCENT sampling", ctx)
}
if (ctx.identifier != null) {
throw QueryParsingErrors.tableSampleByBytesUnsupportedError(
"BUCKET x OUT OF y ON colname", ctx)
Expand All @@ -2433,6 +2451,9 @@ class AstBuilder extends DataTypeAstBuilder
}

case ctx: SampleByBucketContext =>
if (isSystem) {
operationNotAllowed("TABLESAMPLE SYSTEM only supports PERCENT sampling", ctx)
}
sample(ctx.numerator.getText.toDouble / ctx.denominator.getText.toDouble, seed)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,14 @@ object SubqueryAlias {
}
}

sealed trait SampleMethod extends Serializable
object SampleMethod {
/** Row-level sampling (BERNOULLI). Each row independently selected. No I/O savings. */
case object Bernoulli extends SampleMethod
/** System-level sampling (SYSTEM). Entire partitions/splits included or skipped. */
case object System extends SampleMethod
}

object Sample {
/**
* Convenience constructor that wraps a concrete seed in [[Some]].
Expand All @@ -1926,6 +1934,16 @@ object Sample {
child: LogicalPlan): Sample = {
new Sample(lowerBound, upperBound, withReplacement, Some(seed), child)
}

def apply(
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Long,
child: LogicalPlan,
sampleMethod: SampleMethod): Sample = {
new Sample(lowerBound, upperBound, withReplacement, Some(seed), child, sampleMethod)
}
}

/**
Expand All @@ -1939,13 +1957,15 @@ object Sample {
* (SQL `REPEATABLE` clause or programmatic API), `None` when no seed was
* specified and a random seed should be generated at execution time.
* @param child the LogicalPlan
* @param sampleMethod the sampling method (Bernoulli or System)
*/
case class Sample(
lowerBound: Double,
upperBound: Double,
withReplacement: Boolean,
seed: Option[Long],
child: LogicalPlan) extends UnaryNode {
child: LogicalPlan,
sampleMethod: SampleMethod = SampleMethod.Bernoulli) extends UnaryNode {

val eps = RandomSampler.roundingEpsilon
val fraction = upperBound - lowerBound
Expand Down
Loading