Skip to content

Commit 7422781

Browse files
committed
Add support for WATERMARK FOR table constraint syntax
1 parent 6834c8e commit 7422781

5 files changed

Lines changed: 141 additions & 0 deletions

File tree

src/ast/ddl.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -988,6 +988,21 @@ pub enum TableConstraint {
988988
/// Referred column identifier list.
989989
columns: Vec<Ident>,
990990
},
991+
/// Arroyo specific: Watermark definition for streaming tables
992+
/// Syntax:
993+
/// ```sql
994+
/// WATERMARK FOR timestamp AS timestamp - INTERVAL '5 seconds'
995+
/// ```
996+
/// or without an expression
997+
/// ```sql
998+
/// WATERMARK FOR timestamp
999+
/// ```
1000+
Watermark {
1001+
/// Column name to be used for the watermark
1002+
column_name: Ident,
1003+
/// Optional watermark expression
1004+
watermark_expr: Option<Expr>,
1005+
},
9911006
}
9921007

9931008
impl fmt::Display for TableConstraint {
@@ -1115,6 +1130,16 @@ impl fmt::Display for TableConstraint {
11151130

11161131
Ok(())
11171132
}
1133+
Self::Watermark {
1134+
column_name,
1135+
watermark_expr,
1136+
} => {
1137+
write!(f, "WATERMARK FOR {}", column_name)?;
1138+
if let Some(expr) = watermark_expr {
1139+
write!(f, " AS {}", expr)?;
1140+
}
1141+
Ok(())
1142+
}
11181143
}
11191144
}
11201145
}

src/ast/spans.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,9 @@ impl Spanned for TableConstraint {
695695
.map(|i| i.span)
696696
.chain(columns.iter().map(|i| i.span)),
697697
),
698+
TableConstraint::Watermark { column_name, watermark_expr } => {
699+
union_spans(watermark_expr.iter().map(|e| e.span()).chain(core::iter::once(column_name.span)))
700+
}
698701
}
699702
}
700703
}

src/keywords.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,7 @@ define_keywords!(
952952
VOLUME,
953953
WAREHOUSE,
954954
WAREHOUSES,
955+
WATERMARK,
955956
WEEK,
956957
WEEKS,
957958
WHEN,

src/parser/mod.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7566,6 +7566,35 @@ impl<'a> Parser<'a> {
75667566
columns,
75677567
}))
75687568
}
7569+
Token::Word(w)
7570+
if w.keyword == Keyword::WATERMARK
7571+
&& dialect_of!(self is ArroyoDialect | GenericDialect) =>
7572+
{
7573+
if let Some(name) = name {
7574+
return self.expected(
7575+
"WATERMARK option without constraint name",
7576+
TokenWithSpan {
7577+
token: Token::make_keyword(&name.to_string()),
7578+
span: next_token.span,
7579+
},
7580+
);
7581+
}
7582+
7583+
self.expect_keyword(Keyword::FOR)?;
7584+
let column_name = self.parse_identifier()?;
7585+
7586+
// The AS keyword and expression are optional
7587+
let watermark_expr = if self.parse_keyword(Keyword::AS) {
7588+
Some(self.parse_expr()?)
7589+
} else {
7590+
None
7591+
};
7592+
7593+
Ok(Some(TableConstraint::Watermark {
7594+
column_name,
7595+
watermark_expr,
7596+
}))
7597+
}
75697598
_ => {
75707599
if name.is_some() {
75717600
self.expected("PRIMARY, UNIQUE, FOREIGN, or CHECK", next_token)

tests/sqlparser_arroyo.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Licensed under the Apache License, Version 2.0 (the "License");
2+
// you may not use this file except in compliance with the License.
3+
// You may obtain a copy of the License at
4+
//
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
13+
#![warn(clippy::all)]
14+
15+
use sqlparser::ast::{BinaryOperator, Expr, Ident, Statement, TableConstraint};
16+
use sqlparser::dialect::ArroyoDialect;
17+
use sqlparser::parser::Parser;
18+
use sqlparser::test_utils;
19+
use sqlparser::tokenizer::{Location, Span};
20+
21+
#[test]
22+
fn test_watermark_with_expr() {
23+
let sql = "CREATE TABLE orders (
24+
customer_id INT,
25+
order_id INT,
26+
date_string TEXT,
27+
timestamp TIMESTAMP GENERATED ALWAYS AS (CAST(date_string as TIMESTAMP)),
28+
WATERMARK FOR timestamp AS timestamp + 5
29+
) WITH (
30+
connector = 'kafka',
31+
format = 'json',
32+
type = 'source',
33+
bootstrap_servers = 'localhost:9092',
34+
topic = 'order_topic'
35+
)";
36+
37+
let parse = Parser::parse_sql(&ArroyoDialect {}, sql).unwrap();
38+
let Statement::CreateTable(ct) = parse.get(0).unwrap() else {
39+
panic!("not create table")
40+
};
41+
42+
assert_eq!(
43+
ct.constraints,
44+
vec![TableConstraint::Watermark {
45+
column_name: Ident::new("timestamp"),
46+
watermark_expr: Some(Expr::BinaryOp {
47+
left: Box::new(Expr::Identifier(Ident::new("timestamp"))),
48+
op: BinaryOperator::Plus,
49+
right: Box::new(Expr::Value(test_utils::number("5").with_span(Span::new(
50+
Location::new(5, 4), Location::new(5, 10)
51+
)))),
52+
}),
53+
}]
54+
);
55+
}
56+
57+
#[test]
58+
fn test_watermark_without_expr() {
59+
let sql = "CREATE TABLE users (
60+
customer_id INT,
61+
timestamp TIMESTAMP,
62+
WATERMARK FOR timestamp
63+
) WITH (
64+
connector = 'kafka',
65+
format = 'json',
66+
type = 'source',
67+
bootstrap_servers = 'localhost:9092',
68+
topic = 'order_topic'
69+
)";
70+
71+
let parse = Parser::parse_sql(&ArroyoDialect {}, sql).unwrap();
72+
let Statement::CreateTable(ct) = parse.get(0).unwrap() else {
73+
panic!("not create table")
74+
};
75+
76+
assert_eq!(
77+
ct.constraints,
78+
vec![TableConstraint::Watermark {
79+
column_name: Ident::new("timestamp"),
80+
watermark_expr: None,
81+
}]
82+
);
83+
}

0 commit comments

Comments
 (0)