Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 23 additions & 25 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,33 +1014,31 @@ impl CatalogState {
item: name.clone(),
};
let entry = match retractions.items.remove(&key) {
Some(mut retraction) => {
Some(retraction) => {
assert_eq!(retraction.id, item.id);
// We only reparse the SQL if it's changed. Otherwise, we use the existing
// item. This is a performance optimization and not needed for correctness.
// This makes it difficult to use the `UpdateFrom` trait, but the structure
// is still the same as the trait.
if retraction.create_sql() != create_sql {
let item = self
.deserialize_item(
global_id,
&create_sql,
&extra_versions,
local_expression_cache,
Some(retraction.item),
)
.unwrap_or_else(|e| {
panic!("{e:?}: invalid persisted SQL: {create_sql}")
});
retraction.item = item;
}
retraction.id = id;
retraction.oid = oid;
retraction.name = name;
retraction.owner_id = owner_id;
retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges);

retraction
let item = self
.deserialize_item(
global_id,
&create_sql,
&extra_versions,
local_expression_cache,
Some(retraction.item),
)
.unwrap_or_else(|e| {
panic!("{e:?}: invalid persisted SQL: {create_sql}")
});

CatalogEntry {
item,
id,
oid,
name,
owner_id,
privileges: PrivilegeMap::from_mz_acl_items(privileges),
referenced_by: retraction.referenced_by,
used_by: retraction.used_by,
}
}
None => {
let catalog_item = self
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,7 @@ impl CatalogState {
desc,
resolved_ids,
dependencies,
replacement_target: materialized_view.replacement_target,
cluster_id: materialized_view.cluster_id,
non_null_assertions: materialized_view.non_null_assertions,
custom_logical_compaction_window: materialized_view.compaction_window,
Expand Down
36 changes: 35 additions & 1 deletion src/adapter/src/catalog/transact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ pub enum Op {
typ: SqlColumnType,
sql: RawDataType,
},
AlterMaterializedViewApplyReplacement {
id: CatalogItemId,
replacement_id: CatalogItemId,
},
CreateDatabase {
name: String,
owner_id: RoleId,
Expand Down Expand Up @@ -769,6 +773,28 @@ impl Catalog {
tx.update_item(id, new_entry.into())?;
storage_collections_to_register.insert(new_global_id, shard_id);
}
Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
let mut new_entry = state.get_entry(&id).clone();
let replacement = state.get_entry(&replacement_id);

let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
return Err(AdapterError::internal(
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
"id must refer to a materialized view",
));
};
let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
return Err(AdapterError::internal(
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
"replacement_id must refer to a materialized view",
));
};

mv.apply_replacement(replacement_mv.clone());

tx.remove_item(replacement_id)?;
tx.update_item(id, new_entry.into())?;
}
Op::CreateDatabase { name, owner_id } => {
let database_owner_privileges = vec![rbac::owner_privilege(
mz_sql::catalog::ObjectType::Database,
Expand Down Expand Up @@ -1080,7 +1106,15 @@ impl Catalog {
storage_collections_to_create.insert(source.global_id());
}
CatalogItem::MaterializedView(mv) => {
storage_collections_to_create.insert(mv.global_id_writes());
let mv_gid = mv.global_id_writes();
if let Some(target_id) = mv.replacement_target {
let target_gid = state.get_entry(&target_id).latest_global_id();
let shard_id =
state.storage_metadata().get_collection_shard(target_gid)?;
storage_collections_to_register.insert(mv_gid, shard_id);
} else {
storage_collections_to_create.insert(mv_gid);
}
}
CatalogItem::ContinualTask(ct) => {
storage_collections_to_create.insert(ct.global_id());
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ impl ExecuteResponse {
| AlterSource
| AlterSink
| AlterTableAddColumn
| AlterMaterializedViewApplyReplacement
| AlterNetworkPolicy => &[AlteredObject],
AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
AlterSetCluster => &[AlteredObject],
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/continual_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub fn ct_item_from_plan(
expr: mut raw_expr,
dependencies,
column_names: _,
replacement_target: _,
non_null_assertions: _,
compaction_window: _,
refresh_schedule: _,
Expand Down
25 changes: 13 additions & 12 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2101,7 +2101,12 @@ impl Coordinator {
}

self.ship_dataflow(df_desc, mview.cluster_id, None).await;
self.allow_writes(mview.cluster_id, mview.global_id_writes());

// If this is a replacement MV, it must remain read-only until the replacement
// gets applied.
if mview.replacement_target.is_none() {
self.allow_writes(mview.cluster_id, mview.global_id_writes());
}
}
CatalogItem::Sink(sink) => {
policies_to_set
Expand Down Expand Up @@ -2736,6 +2741,7 @@ impl Coordinator {
since: None,
status_collection_id,
timeline: Some(timeline.clone()),
primary: None,
}
};

Expand Down Expand Up @@ -2766,10 +2772,9 @@ impl Coordinator {
let next_version = version.bump();
let primary_collection =
versions.get(&next_version).map(|(gid, _desc)| gid).copied();
let collection_desc = CollectionDescription::for_table(
desc.clone(),
primary_collection,
);
let mut collection_desc =
CollectionDescription::for_table(desc.clone());
collection_desc.primary = primary_collection;

(*gid, collection_desc)
});
Expand Down Expand Up @@ -2808,13 +2813,8 @@ impl Coordinator {
compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
}
CatalogItem::ContinualTask(ct) => {
let collection_desc = CollectionDescription {
desc: ct.desc.clone(),
data_source: DataSource::Other,
since: ct.initial_as_of.clone(),
status_collection_id: None,
timeline: None,
};
let collection_desc =
CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
if ct.global_id().is_system() && collection_desc.since.is_none() {
// We need a non-0 since to make as_of selection work. Fill it in below with
// the `bootstrap_builtin_continual_tasks` call, which can only be run after
Expand Down Expand Up @@ -2859,6 +2859,7 @@ impl Coordinator {
since: None,
status_collection_id: None,
timeline: None,
primary: None,
};
collections.push((sink.global_id, collection_desc));
}
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/appends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,7 @@ pub(crate) fn waiting_on_startup_appends(
| Plan::AlterRole(_)
| Plan::AlterOwner(_)
| Plan::AlterTableAddColumn(_)
| Plan::AlterMaterializedViewApplyReplacement(_)
| Plan::Declare(_)
| Plan::Fetch(_)
| Plan::Close(_)
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/coord/catalog_serving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>(
| Plan::AlterRole(_)
| Plan::AlterOwner(_)
| Plan::AlterTableAddColumn(_)
| Plan::AlterMaterializedViewApplyReplacement(_)
| Plan::Declare(_)
| Plan::Fetch(_)
| Plan::Close(_)
Expand Down
2 changes: 2 additions & 0 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,7 @@ impl Coordinator {
| Statement::AlterConnection(_)
| Statement::AlterDefaultPrivileges(_)
| Statement::AlterIndex(_)
| Statement::AlterMaterializedViewApplyReplacement(_)
| Statement::AlterSetCluster(_)
| Statement::AlterOwner(_)
| Statement::AlterRetainHistory(_)
Expand Down Expand Up @@ -1170,6 +1171,7 @@ impl Coordinator {
if_exists: cmvs.if_exists,
name: cmvs.name,
columns: cmvs.columns,
replacing: cmvs.replacing,
in_cluster: cmvs.in_cluster,
query: cmvs.query,
with_options: cmvs.with_options,
Expand Down
Loading