Skip to content

Commit 201d9d6

Browse files
ggevayjunie-agent
andauthored
adapter: Handle AlterRetainHistory in catalog implications (#35571)
Move the read-policy side effects of ALTER RETAIN HISTORY from sequence_alter_retain_history into the catalog implications handlers for Index, Table, Source, and MaterializedView. For each Altered handler, detect when custom_logical_compaction_window changed and call the appropriate read policy update (update_compute_read_policy for indexes, update_storage_read_policies for storage objects). For MV, use else-if to avoid redundant policy updates during replacement (the replacement's collections already have the correct policies from when they were created). For ContinualTask, replace the placeholder with a no-op comment (CTs don't support retain history). Add soft_assert_or_log in sequence_alter_retain_history as a safety net, and use bail_unsupported in planning for a better error message. Simplify sequence_alter_retain_history to a plain catalog_transact_with_ddl_transaction now that side effects are handled by the implications code. Co-authored-by: Junie <[email protected]> Nightly: https://buildkite.com/materialize/nightly/builds/15786 Co-authored-by: Junie <[email protected]>
1 parent 1c02d69 commit 201d9d6

3 files changed

Lines changed: 79 additions & 58 deletions

File tree

src/adapter/src/coord/catalog_implications.rs

Lines changed: 68 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,10 @@ impl Coordinator {
226226
CatalogImplication::Table(CatalogImplicationKind::Altered {
227227
prev: prev_table,
228228
new: new_table,
229-
}) => self.handle_alter_table(prev_table, new_table).await?,
229+
}) => {
230+
self.handle_alter_table(catalog_id, prev_table, new_table)
231+
.await?
232+
}
230233

231234
CatalogImplication::Table(CatalogImplicationKind::Dropped(table, full_name)) => {
232235
let global_ids = table.global_ids();
@@ -258,13 +261,23 @@ impl Coordinator {
258261
.await?
259262
}
260263
CatalogImplication::Source(CatalogImplicationKind::Altered {
261-
prev: (prev_source, _prev_connection),
262-
new: (new_source, _new_connection),
264+
prev: (prev_source, prev_connection),
265+
new: (new_source, new_connection),
263266
}) => {
267+
if prev_source.custom_logical_compaction_window
268+
!= new_source.custom_logical_compaction_window
269+
{
270+
let new_window = new_source
271+
.custom_logical_compaction_window
272+
.unwrap_or(CompactionWindow::Default);
273+
self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
274+
}
264275
tracing::debug!(
265276
?prev_source,
277+
?prev_connection,
266278
?new_source,
267-
"not handling AlterSource in here yet"
279+
?new_connection,
280+
"not handling other source alterations (e.g., connection changes, subsource additions/drops) in here yet"
268281
);
269282
}
270283
CatalogImplication::Source(CatalogImplicationKind::Dropped(
@@ -318,11 +331,18 @@ impl Coordinator {
318331
prev: prev_index,
319332
new: new_index,
320333
}) => {
321-
tracing::debug!(
322-
?prev_index,
323-
?new_index,
324-
"not handling AlterIndex in here yet"
325-
);
334+
if prev_index.custom_logical_compaction_window
335+
!= new_index.custom_logical_compaction_window
336+
{
337+
let new_window = new_index
338+
.custom_logical_compaction_window
339+
.unwrap_or(CompactionWindow::Default);
340+
self.update_compute_read_policy(
341+
new_index.cluster_id,
342+
catalog_id,
343+
new_window.into(),
344+
);
345+
}
326346
}
327347
CatalogImplication::Index(CatalogImplicationKind::Dropped(index, full_name)) => {
328348
indexes_to_drop.push((index.cluster_id, index.global_id()));
@@ -335,17 +355,24 @@ impl Coordinator {
335355
prev: prev_mv,
336356
new: new_mv,
337357
}) => {
338-
// We get here for two reasons:
339-
// * Name changes, like those caused by ALTER SCHEMA.
340-
// * Replacement application.
358+
// We get here for three reasons:
359+
// 1. Name changes, like those caused by ALTER SCHEMA.
360+
// 2. Replacement application.
361+
// 3. Compaction window changes (ALTER ... SET (RETAIN HISTORY ...)).
341362
//
342-
// In the first case, we don't have to do anything here. The second case is
343-
// tricky: Replacement application changes the `CatalogItemId` of the target to
344-
// that of the replacement and simultaneously drops the replacement. Which
345-
// means when we get here `prev_mv` is the replacement that should be dropped,
346-
// and `new_mv` is the target that already exists but under a different ID
347-
// (which will receive a `Dropped` event separately). We can sniff out this
363+
// 1. Name changes: We don't have to do anything here.
364+
//
365+
// 2. Replacement application: This is tricky: It changes the `CatalogItemId` of
366+
// the target to that of the replacement and simultaneously drops the replacement.
367+
// Which means when we get here `prev_mv` is the replacement that should be
368+
// dropped, and `new_mv` is the target that already exists but under a different
369+
// ID (which will receive a `Dropped` event separately). We can sniff out this
348370
// case by checking for version differences.
371+
//
372+
// 3. Compaction window changes: We handle this in an `else if`, because if there
373+
// is also a replacement application, then the replacement's storage collections
374+
// already have the correct read policies from when they were created, so we
375+
// don't need to update them here.
349376
if prev_mv.collections != new_mv.collections {
350377
// Sanity check: The replacement's last (and only) version must be the same
351378
// as the new target's last version.
@@ -363,6 +390,13 @@ impl Coordinator {
363390
// but we need to prevent it from dropping the old storage collection as
364391
// well, since that might still be depended on.
365392
source_gids_to_keep.extend(new_mv.global_ids());
393+
} else if prev_mv.custom_logical_compaction_window
394+
!= new_mv.custom_logical_compaction_window
395+
{
396+
let new_window = new_mv
397+
.custom_logical_compaction_window
398+
.unwrap_or(CompactionWindow::Default);
399+
self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
366400
}
367401
}
368402
CatalogImplication::MaterializedView(CatalogImplicationKind::Dropped(
@@ -394,14 +428,12 @@ impl Coordinator {
394428
tracing::debug!(?ct, "not handling AddContinualTask in here yet");
395429
}
396430
CatalogImplication::ContinualTask(CatalogImplicationKind::Altered {
397-
prev: prev_ct,
398-
new: new_ct,
431+
prev: _prev_ct,
432+
new: _new_ct,
399433
}) => {
400-
tracing::debug!(
401-
?prev_ct,
402-
?new_ct,
403-
"not handling AlterContinualTask in here yet"
404-
);
434+
// No action needed: continual task alterations (e.g.
435+
// renames, owner changes) are catalog-only and require
436+
// no controller changes.
405437
}
406438
CatalogImplication::ContinualTask(CatalogImplicationKind::Dropped(
407439
ct,
@@ -1152,16 +1184,24 @@ impl Coordinator {
11521184
#[instrument(level = "debug")]
11531185
async fn handle_alter_table(
11541186
&mut self,
1187+
catalog_id: CatalogItemId,
11551188
prev_table: Table,
11561189
new_table: Table,
11571190
) -> Result<(), AdapterError> {
11581191
let existing_gid = prev_table.global_id_writes();
11591192
let new_gid = new_table.global_id_writes();
11601193

11611194
if existing_gid == new_gid {
1162-
// It's not an ALTER TABLE as far as the controller is concerned,
1163-
// because we still have the same GlobalId. This is likely a change
1164-
// from an ALTER SWAP.
1195+
// It's not an ALTER TABLE ADD COLUMN, because we still have the
1196+
// same GlobalId. It might be a compaction window change.
1197+
if prev_table.custom_logical_compaction_window
1198+
!= new_table.custom_logical_compaction_window
1199+
{
1200+
let new_window = new_table
1201+
.custom_logical_compaction_window
1202+
.unwrap_or(CompactionWindow::Default);
1203+
self.update_storage_read_policies(vec![(catalog_id, new_window.into())]);
1204+
}
11651205
return Ok(());
11661206
}
11671207

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use mz_ore::cast::CastFrom;
3232
use mz_ore::collections::{CollectionExt, HashSet};
3333
use mz_ore::task::{self, JoinHandle, spawn};
3434
use mz_ore::tracing::OpenTelemetryContext;
35-
use mz_ore::{assert_none, instrument};
35+
use mz_ore::{assert_none, instrument, soft_assert_or_log};
3636
use mz_repr::adt::jsonb::Jsonb;
3737
use mz_repr::adt::mz_acl_item::{MzAclItem, PrivilegeMap};
3838
use mz_repr::explain::ExprHumanizer;
@@ -3092,39 +3092,20 @@ impl Coordinator {
30923092
ctx: &mut ExecuteContext,
30933093
plan: plan::AlterRetainHistoryPlan,
30943094
) -> Result<ExecuteResponse, AdapterError> {
3095+
soft_assert_or_log!(
3096+
!matches!(
3097+
self.catalog().get_entry(&plan.id).item(),
3098+
CatalogItem::ContinualTask(_)
3099+
),
3100+
"RETAIN HISTORY is not supported on continual tasks"
3101+
);
30953102
let ops = vec![catalog::Op::AlterRetainHistory {
30963103
id: plan.id,
30973104
value: plan.value,
30983105
window: plan.window,
30993106
}];
3100-
self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
3101-
Box::pin(async move {
3102-
let catalog_item = coord.catalog().get_entry(&plan.id).item();
3103-
let cluster = match catalog_item {
3104-
CatalogItem::Table(_)
3105-
| CatalogItem::MaterializedView(_)
3106-
| CatalogItem::Source(_)
3107-
| CatalogItem::ContinualTask(_) => None,
3108-
CatalogItem::Index(index) => Some(index.cluster_id),
3109-
CatalogItem::Log(_)
3110-
| CatalogItem::View(_)
3111-
| CatalogItem::Sink(_)
3112-
| CatalogItem::Type(_)
3113-
| CatalogItem::Func(_)
3114-
| CatalogItem::Secret(_)
3115-
| CatalogItem::Connection(_) => unreachable!(),
3116-
};
3117-
match cluster {
3118-
Some(cluster) => {
3119-
coord.update_compute_read_policy(cluster, plan.id, plan.window.into());
3120-
}
3121-
None => {
3122-
coord.update_storage_read_policies(vec![(plan.id, plan.window.into())]);
3123-
}
3124-
}
3125-
})
3126-
})
3127-
.await?;
3107+
self.catalog_transact_with_context(None, Some(ctx), ops)
3108+
.await?;
31283109
Ok(ExecuteResponse::AlteredObject(plan.object_type))
31293110
}
31303111

src/sql/src/plan/statement/ddl.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7010,7 +7010,7 @@ fn alter_retain_history(
70107010
UnresolvedObjectName::Item(name),
70117011
) => name,
70127012
(object_type, _) => {
7013-
sql_bail!("{object_type} does not support RETAIN HISTORY")
7013+
bail_unsupported!(format!("RETAIN HISTORY on {object_type}"))
70147014
}
70157015
};
70167016
match resolve_item_or_type(scx, object_type, name.clone(), if_exists)? {

0 commit comments

Comments
 (0)