Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions doc/user/content/serve-results/sink/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,17 @@ In Materialize, you can sink from a materialized view, table, or source. Use
- `<my_iceberg_table>` with the name of your Iceberg table. If the Iceberg table
does not exist, Materialize creates the table. For details, see [`CREATE SINK`
reference page](/sql/create-sink/iceberg/#iceberg-table-creation).
- `<key>` with the column(s) that uniquely identify rows.
- `<commit_interval>` with your commit interval (e.g., `60s`). The commit
interval specifies how frequently Materialize commits snapshots to Iceberg.
The minimum commit interval is `1s`. See [Commit interval
tradeoffs](#commit-interval-tradeoffs) below.

For the full list of syntax options, see the [`CREATE SINK` reference](/sql/create-sink/iceberg).

### Upsert mode

In upsert mode, replace `<key>` with the column(s) that uniquely identify rows.

```mzsql
CREATE SINK <sink_name>
IN CLUSTER <sink_cluster>
Expand All @@ -220,7 +225,23 @@ CREATE SINK <sink_name>
WITH (COMMIT INTERVAL = '<commit_interval>');
```

For the full list of syntax options, see the [`CREATE SINK` reference](/sql/create-sink/iceberg).
### Append mode

In append mode, no `KEY` clause is used. The Iceberg table includes all source
columns plus `_mz_diff` (`int`) and `_mz_timestamp` (`long`).

```mzsql
CREATE SINK <sink_name>
IN CLUSTER <sink_cluster>
FROM <my_materialize_object>
INTO ICEBERG CATALOG CONNECTION iceberg_catalog_connection (
NAMESPACE = '<my_s3_table_bucket_namespace>',
TABLE = '<my_iceberg_table>'
)
USING AWS CONNECTION aws_connection
MODE APPEND
WITH (COMMIT INTERVAL = '<commit_interval>');
```

## Considerations

Expand Down
63 changes: 55 additions & 8 deletions doc/user/content/sql/create-sink/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,21 @@ To create an Iceberg sink, you need:

## Syntax

{{% include-syntax file="examples/create_sink_iceberg" example="syntax" %}}
{{< tabs level=3 >}}

{{< tab "MODE UPSERT" >}}

{{% include-syntax file="examples/create_sink_iceberg" example="syntax-upsert" %}}

{{< /tab >}}

{{< tab "MODE APPEND" >}}

{{% include-syntax file="examples/create_sink_iceberg" example="syntax-append" %}}

{{< /tab >}}

{{< /tabs >}}

## Details

Expand All @@ -36,8 +50,9 @@ At each `COMMIT INTERVAL`:

1. All pending writes are flushed to Parquet data files. See [Type
mapping](#type-mapping).
2. Delete files are written for any updates or deletes. See [Delete
handling](#delete-handling).
2. In **upsert** mode, delete files are written for any updates or deletes. See
[Delete handling](#delete-handling). In **append** mode, no delete files are
written; all changes are data rows. See [Append mode](#append-mode).
3. A new Iceberg snapshot is committed atomically.

When the snapshot is committed, the data is available to downstream query
Expand Down Expand Up @@ -86,14 +101,32 @@ queries.

### Unique keys

The Iceberg sink uses upsert semantics based on the `KEY`. The columns you
In upsert mode, the Iceberg sink uses upsert semantics based on the `KEY`. The columns you
specify as the `KEY` must uniquely identify rows. Materialize validates that the
key is unique; if it cannot prove uniqueness, you'll receive an error.

If you have outside knowledge that the key is unique, you can bypass validation
using `NOT ENFORCED`. However, if the key is not actually unique, downstream
consumers may see incorrect results.

### Append mode

In append mode (`MODE APPEND`), every change in the Materialize update stream
is written as a data row. No Iceberg delete files are produced. Two extra
columns are appended to the Iceberg table:

| Column | Iceberg type | Description |
|--------|-------------|-------------|
| `_mz_diff` | `int` | `+1` for insertions, `-1` for deletions. |
| `_mz_timestamp` | `long` | The Materialize logical timestamp of the change. |

- An **insert** produces one row with `_mz_diff = +1`.
- A **delete** produces one row with `_mz_diff = -1`.
- An **update** produces two rows: one with `_mz_diff = -1` (the old value) and
one with `_mz_diff = +1` (the new value). Both carry the same `_mz_timestamp`.

No `KEY` clause is permitted with `MODE APPEND`.

### Type mapping

{{% include-headless
Expand All @@ -105,6 +138,11 @@ consumers may see incorrect results.

### Delete handling

{{< note >}}
Delete handling applies to `MODE UPSERT` only. In `MODE APPEND`, all changes
are written as data rows. See [Append mode](#append-mode).
{{< /note >}}

Iceberg sinks use a hybrid delete strategy:

- **Position deletes**: Used when a row is inserted and then deleted or updated
Expand Down Expand Up @@ -138,14 +176,14 @@ connection.
{{% include-example file="examples/create_connection"
example="example-iceberg-catalog-connection" %}}

### Creating a sink
### Creating an upsert sink

{{% include-example file="examples/create_sink_iceberg"
example="example-create-iceberg-sink" %}}

The required `KEY` clause uniquely identifies rows; in this example, it uses a
composite key of `user_id` and `event_timestamp`. Materialize validates that
this key is unique in the source data.
In upsert mode, the required `KEY` clause uniquely identifies rows; in this
example, it uses a composite key of `user_id` and `event_timestamp`.
Materialize validates that this key is unique in the source data.

### Bypassing unique key validation

Expand All @@ -171,6 +209,15 @@ If the key is not actually unique, downstream consumers may see incorrect
results.
{{< /warning >}}

### Creating an append sink

{{% include-example file="examples/create_sink_iceberg"
example="example-create-iceberg-sink-append" %}}

The Iceberg table will contain all columns from `user_events` plus two
additional columns: `_mz_diff` and `_mz_timestamp`. See [Append
mode](#append-mode).

## Related pages

- [Iceberg sink guide](/serve-results/sink/iceberg/)
Expand Down
72 changes: 71 additions & 1 deletion doc/user/data/examples/create_sink_iceberg.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
- name: "syntax"
- name: "syntax-upsert"
code: |
CREATE SINK [IF NOT EXISTS] <sink_name>
[IN CLUSTER <cluster_name>]
Expand Down Expand Up @@ -53,6 +53,59 @@
description: |
How frequently to commit snapshots to Iceberg (e.g., `'60s'`, `'5m'`). See [Commit interval tradeoffs](#commit-interval-tradeoffs).

- name: "syntax-append"
code: |
CREATE SINK [IF NOT EXISTS] <sink_name>
[IN CLUSTER <cluster_name>]
FROM <item_name>
INTO ICEBERG CATALOG CONNECTION <catalog_connection> (
NAMESPACE = '<namespace>',
TABLE = '<table>'
)
USING AWS CONNECTION <aws_connection>
MODE APPEND
WITH (COMMIT INTERVAL = '<interval>')
syntax_elements:
- name: "`<sink_name>`"
description: |
The name for the sink.
- name: "**IF NOT EXISTS**"
description: |
Optional. If specified, do not throw an error if a sink with the same name already exists.
- name: "**IN CLUSTER** `<cluster_name>`"
description: |
Optional. The [cluster](/sql/create-cluster) to maintain this sink. If
unspecified, defaults to the active cluster.
- name: "`<item_name>`"
description: |
The name of the source, table, or materialized view to sink.
- name: "**ICEBERG CATALOG CONNECTION** `<catalog_connection>`"
description: |
The name of the [Iceberg catalog connection](/sql/create-connection/#iceberg-catalog) to use.
- name: "**NAMESPACE** `'<namespace>'`"
description: |
The Iceberg namespace (database) containing the table.
- name: "**TABLE** `'<table>'`"
description: |
The name of the unpartitioned Iceberg table to write to. If the table
doesn't exist, Materialize creates it automatically. For details, see
[Iceberg table
creation](/sql/create-sink/iceberg/#iceberg-table-creation).
- name: "**USING AWS CONNECTION** `<aws_connection>`"
description: |
The [AWS connection](/sql/create-connection/#aws) for object storage access.
- name: "**MODE APPEND**"
description: |
Writes all changes as data rows instead of using Iceberg delete files.
Two extra columns are appended to the Iceberg table: `_mz_diff` (`int`,
`+1` for inserts, `-1` for deletes) and `_mz_timestamp` (`long`). An
update produces two rows: one with `_mz_diff = -1` (old values) and one
with `_mz_diff = +1` (new values). No `KEY` clause is permitted. See
[Append mode](#append-mode).
- name: "**COMMIT INTERVAL** `'<interval>'`"
description: |
How frequently to commit snapshots to Iceberg (e.g., `'60s'`, `'5m'`). See [Commit interval tradeoffs](#commit-interval-tradeoffs).

- name: "example-create-iceberg-sink"
description: |
Using the previously created AWS and Iceberg catalog connection, the
Expand All @@ -71,6 +124,23 @@
WITH (COMMIT INTERVAL = '1m');
testable: false

- name: "example-create-iceberg-sink-append"
description: |
Create an Iceberg sink in append mode. All changes are written as data
rows with `_mz_diff` and `_mz_timestamp` columns:
code: |
CREATE SINK events_log_iceberg
IN CLUSTER analytics_cluster
FROM user_events
INTO ICEBERG CATALOG CONNECTION iceberg_catalog_connection (
NAMESPACE = 'events',
TABLE = 'user_events_log'
)
USING AWS CONNECTION aws_connection
MODE APPEND
WITH (COMMIT INTERVAL = '1m');
testable: false

- name: "restrictions-limitations-regions"
content: |
Your S3 Tables bucket must be in the same AWS region as your Materialize
Expand Down
1 change: 1 addition & 0 deletions src/catalog/src/memory/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,7 @@ impl Sink {
match &self.envelope {
SinkEnvelope::Debezium => Some("debezium"),
SinkEnvelope::Upsert => Some("upsert"),
SinkEnvelope::Append => Some("append"),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/sql-lexer/src/keywords.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Analysis
Analyze
And
Any
Append
Apply
Arity
Arn
Expand Down
4 changes: 4 additions & 0 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ impl_display!(SinkEnvelope);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum IcebergSinkMode {
Upsert,
Append,
}

impl AstDisplay for IcebergSinkMode {
Expand All @@ -716,6 +717,9 @@ impl AstDisplay for IcebergSinkMode {
Self::Upsert => {
f.write_str("UPSERT");
}
Self::Append => {
f.write_str("APPEND");
}
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2410,8 +2410,10 @@ impl<'a> Parser<'a> {
fn parse_iceberg_sink_mode(&mut self) -> Result<IcebergSinkMode, ParserError> {
if self.parse_keyword(UPSERT) {
Ok(IcebergSinkMode::Upsert)
} else if self.parse_keyword(APPEND) {
Ok(IcebergSinkMode::Append)
} else {
self.expected(self.peek_pos(), "UPSERT", self.peek_token())
self.expected(self.peek_pos(), "UPSERT, APPEND", self.peek_token())
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,13 @@ error: Expected one of NAMESPACE or TABLE, found identifier "blah"
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (BLAH = 'boo!') USING AWS CONNECTION aws_conn MODE UPSERT;
^

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn MODE APPEND;
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn MODE APPEND
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: None, options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] })

parse-statement
CREATE INDEX foo ON myschema.bar (a, b)
----
Expand Down
31 changes: 29 additions & 2 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3500,6 +3500,9 @@ fn plan_sink(
(CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Upsert)) => {
SinkEnvelope::Upsert
}
(CreateSinkConnection::Iceberg { .. }, None, Some(ast::IcebergSinkMode::Append)) => {
SinkEnvelope::Append
}
(CreateSinkConnection::Iceberg { .. }, None, None) => {
sql_bail!("MODE clause is required")
}
Expand Down Expand Up @@ -3604,6 +3607,30 @@ fn plan_sink(
| CreateSinkConnection::Iceberg { key: None, .. } => None,
};

if key_indices.is_some() && envelope == SinkEnvelope::Append {
sql_bail!("KEY is not supported for MODE APPEND Iceberg sinks");
}

// Reject input columns that clash with the columns MODE APPEND adds to the Iceberg table.
if envelope == SinkEnvelope::Append {
if let CreateSinkConnection::Iceberg { .. } = &connection {
use mz_storage_types::sinks::{
ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN,
};
for (col_name, _) in desc.iter() {
if col_name.as_str() == ICEBERG_APPEND_DIFF_COLUMN
|| col_name.as_str() == ICEBERG_APPEND_TIMESTAMP_COLUMN
{
sql_bail!(
"column {} conflicts with the system column that MODE APPEND \
adds to the Iceberg table",
col_name.quoted()
);
}
}
}
}

let headers_index = match &connection {
CreateSinkConnection::Kafka {
headers: Some(headers),
Expand All @@ -3612,7 +3639,7 @@ fn plan_sink(
scx.require_feature_flag(&ENABLE_KAFKA_SINK_HEADERS)?;

match envelope {
SinkEnvelope::Upsert => (),
SinkEnvelope::Upsert | SinkEnvelope::Append => (),
SinkEnvelope::Debezium => {
sql_bail!("HEADERS option is not supported with ENVELOPE DEBEZIUM")
}
Expand Down Expand Up @@ -4121,7 +4148,7 @@ fn kafka_sink_builder(
let mut scope = Scope::from_source(None, value_desc.iter_names());

match envelope {
SinkEnvelope::Upsert => (),
SinkEnvelope::Upsert | SinkEnvelope::Append => (),
SinkEnvelope::Debezium => {
let key_indices: HashSet<_> = key_desc_and_indices
.as_ref()
Expand Down
8 changes: 8 additions & 0 deletions src/storage-types/src/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ impl<S: Debug + PartialEq, T: Debug + PartialEq + PartialOrder> AlterCompatible

#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum SinkEnvelope {
/// Only used for Kafka.
Debezium,
Upsert,
/// Only used for Iceberg.
Append,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
Expand Down Expand Up @@ -636,6 +639,11 @@ pub struct S3UploadInfo {
pub const MIN_S3_SINK_FILE_SIZE: ByteSize = ByteSize::mb(16);
pub const MAX_S3_SINK_FILE_SIZE: ByteSize = ByteSize::gb(4);

/// Column name appended by MODE APPEND Iceberg sinks to record the diff (+1/−1).
pub const ICEBERG_APPEND_DIFF_COLUMN: &str = "_mz_diff";
/// Column name appended by MODE APPEND Iceberg sinks to record the logical timestamp.
pub const ICEBERG_APPEND_TIMESTAMP_COLUMN: &str = "_mz_timestamp";

#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub struct IcebergSinkConnection<C: ConnectionAccess = InlinedConnection> {
pub catalog_connection_id: CatalogItemId,
Expand Down
Loading
Loading