Skip to content

Commit 252f011

Browse files
committed
adapter,sql: plan and sequence replacement MV application
1 parent 8eea5b6 commit 252f011

File tree

14 files changed

+253
-110
lines changed

14 files changed

+253
-110
lines changed

src/adapter/src/catalog/transact.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ pub enum Op {
100100
typ: SqlColumnType,
101101
sql: RawDataType,
102102
},
103+
AlterMaterializedViewApplyReplacement {
104+
id: CatalogItemId,
105+
replacement_id: CatalogItemId,
106+
},
103107
CreateDatabase {
104108
name: String,
105109
owner_id: RoleId,
@@ -769,6 +773,28 @@ impl Catalog {
769773
tx.update_item(id, new_entry.into())?;
770774
storage_collections_to_register.insert(new_global_id, shard_id);
771775
}
776+
Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
777+
let mut new_entry = state.get_entry(&id).clone();
778+
let replacement = state.get_entry(&replacement_id);
779+
780+
let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
781+
return Err(AdapterError::internal(
782+
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
783+
"id must refer to a materialized view",
784+
));
785+
};
786+
let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
787+
return Err(AdapterError::internal(
788+
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
789+
"replacement_id must refer to a materialized view",
790+
));
791+
};
792+
793+
mv.apply_replacement(replacement_mv.clone());
794+
795+
tx.remove_item(replacement_id)?;
796+
tx.update_item(id, new_entry.into())?;
797+
}
772798
Op::CreateDatabase { name, owner_id } => {
773799
let database_owner_privileges = vec![rbac::owner_privilege(
774800
mz_sql::catalog::ObjectType::Database,

src/adapter/src/command.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,7 @@ impl ExecuteResponse {
644644
| AlterSource
645645
| AlterSink
646646
| AlterTableAddColumn
647+
| AlterMaterializedViewApplyReplacement
647648
| AlterNetworkPolicy => &[AlteredObject],
648649
AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
649650
AlterSetCluster => &[AlteredObject],

src/adapter/src/coord/appends.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,7 @@ pub(crate) fn waiting_on_startup_appends(
10041004
| Plan::AlterRole(_)
10051005
| Plan::AlterOwner(_)
10061006
| Plan::AlterTableAddColumn(_)
1007+
| Plan::AlterMaterializedViewApplyReplacement(_)
10071008
| Plan::Declare(_)
10081009
| Plan::Fetch(_)
10091010
| Plan::Close(_)

src/adapter/src/coord/catalog_serving.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>(
128128
| Plan::AlterRole(_)
129129
| Plan::AlterOwner(_)
130130
| Plan::AlterTableAddColumn(_)
131+
| Plan::AlterMaterializedViewApplyReplacement(_)
131132
| Plan::Declare(_)
132133
| Plan::Fetch(_)
133134
| Plan::Close(_)

src/adapter/src/coord/ddl.rs

Lines changed: 17 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -214,9 +214,7 @@ impl Coordinator {
214214
let mut webhook_sources_to_restart = BTreeSet::new();
215215
let mut table_gids_to_drop = vec![];
216216
let mut storage_sink_gids_to_drop = vec![];
217-
let mut indexes_to_drop = vec![];
218-
let mut materialized_views_to_drop = vec![];
219-
let mut continual_tasks_to_drop = vec![];
217+
let mut compute_gids_to_drop = vec![];
220218
let mut views_to_drop = vec![];
221219
let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![];
222220
let mut secrets_to_drop = vec![];
@@ -275,21 +273,21 @@ impl Coordinator {
275273
storage_sink_gids_to_drop.push(sink.global_id());
276274
}
277275
CatalogItem::Index(index) => {
278-
indexes_to_drop.push((index.cluster_id, index.global_id()));
276+
compute_gids_to_drop
277+
.push((index.cluster_id, index.global_id()));
279278
}
280279
CatalogItem::MaterializedView(mv) => {
281-
materialized_views_to_drop
280+
compute_gids_to_drop
282281
.push((mv.cluster_id, mv.global_id_writes()));
282+
sources_to_drop
283+
.extend(mv.global_ids().map(|gid| (*id, gid)));
283284
}
284285
CatalogItem::View(view) => {
285286
views_to_drop.push((*id, view.clone()))
286287
}
287288
CatalogItem::ContinualTask(ct) => {
288-
continual_tasks_to_drop.push((
289-
*id,
290-
ct.cluster_id,
291-
ct.global_id(),
292-
));
289+
compute_gids_to_drop.push((ct.cluster_id, ct.global_id()));
290+
sources_to_drop.push((*id, ct.global_id()));
293291
}
294292
CatalogItem::Secret(_) => {
295293
secrets_to_drop.push(*id);
@@ -422,9 +420,7 @@ impl Coordinator {
422420
.map(|(_, gid)| *gid)
423421
.chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
424422
.chain(storage_sink_gids_to_drop.iter().copied())
425-
.chain(indexes_to_drop.iter().map(|(_, gid)| *gid))
426-
.chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
427-
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid))
423+
.chain(compute_gids_to_drop.iter().map(|(_, gid)| *gid))
428424
.chain(views_to_drop.iter().map(|(_id, view)| view.global_id()))
429425
.collect();
430426

@@ -494,30 +490,19 @@ impl Coordinator {
494490
}
495491
}
496492

497-
let storage_ids_to_drop = sources_to_drop
493+
let storage_gids_to_drop = sources_to_drop
498494
.iter()
499495
.map(|(_, gid)| *gid)
500496
.chain(storage_sink_gids_to_drop.iter().copied())
501-
.chain(table_gids_to_drop.iter().map(|(_, gid)| *gid))
502-
.chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid))
503-
.chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid));
504-
let compute_ids_to_drop = indexes_to_drop
505-
.iter()
506-
.copied()
507-
.chain(materialized_views_to_drop.iter().copied())
508-
.chain(
509-
continual_tasks_to_drop
510-
.iter()
511-
.map(|(_, cluster_id, gid)| (*cluster_id, *gid)),
512-
);
497+
.chain(table_gids_to_drop.iter().map(|(_, gid)| *gid));
513498

514499
// Check if any Timelines would become empty, if we dropped the specified storage or
515500
// compute resources.
516501
//
517502
// Note: only after a Transaction succeeds do we actually drop the timeline
518503
let collection_id_bundle = self.build_collection_id_bundle(
519-
storage_ids_to_drop,
520-
compute_ids_to_drop,
504+
storage_gids_to_drop,
505+
compute_gids_to_drop.clone(),
521506
clusters_to_drop.clone(),
522507
);
523508
let timeline_associations: BTreeMap<_, _> = self
@@ -655,14 +640,8 @@ impl Coordinator {
655640
self.cancel_pending_copy(&conn_id);
656641
}
657642
}
658-
if !indexes_to_drop.is_empty() {
659-
self.drop_indexes(indexes_to_drop);
660-
}
661-
if !materialized_views_to_drop.is_empty() {
662-
self.drop_materialized_views(materialized_views_to_drop);
663-
}
664-
if !continual_tasks_to_drop.is_empty() {
665-
self.drop_continual_tasks(continual_tasks_to_drop);
643+
if !compute_gids_to_drop.is_empty() {
644+
self.drop_compute_collections(compute_gids_to_drop);
666645
}
667646
if !vpc_endpoints_to_drop.is_empty() {
668647
self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop)
@@ -1030,7 +1009,7 @@ impl Coordinator {
10301009
.unwrap_or_terminate("cannot fail to drop sinks");
10311010
}
10321011

1033-
pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>) {
1012+
pub(crate) fn drop_compute_collections(&mut self, indexes: Vec<(ClusterId, GlobalId)>) {
10341013
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
10351014
for (cluster_id, gid) in indexes {
10361015
by_cluster.entry(cluster_id).or_default().push(gid);
@@ -1046,58 +1025,6 @@ impl Coordinator {
10461025
}
10471026
}
10481027

1049-
/// A convenience method for dropping materialized views.
1050-
fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>) {
1051-
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1052-
let mut mv_gids = Vec::new();
1053-
for (cluster_id, gid) in mviews {
1054-
by_cluster.entry(cluster_id).or_default().push(gid);
1055-
mv_gids.push(gid);
1056-
}
1057-
1058-
// Drop compute sinks.
1059-
for (cluster_id, ids) in by_cluster {
1060-
let compute = &mut self.controller.compute;
1061-
// A cluster could have been dropped, so verify it exists.
1062-
if compute.instance_exists(cluster_id) {
1063-
compute
1064-
.drop_collections(cluster_id, ids)
1065-
.unwrap_or_terminate("cannot fail to drop collections");
1066-
}
1067-
}
1068-
1069-
// Drop storage resources.
1070-
let storage_metadata = self.catalog.state().storage_metadata();
1071-
self.controller
1072-
.storage
1073-
.drop_sources(storage_metadata, mv_gids)
1074-
.unwrap_or_terminate("cannot fail to drop sources");
1075-
}
1076-
1077-
/// A convenience method for dropping continual tasks.
1078-
fn drop_continual_tasks(&mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>) {
1079-
let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new();
1080-
let mut source_ids = Vec::new();
1081-
for (item_id, cluster_id, gid) in cts {
1082-
by_cluster.entry(cluster_id).or_default().push(gid);
1083-
source_ids.push((item_id, gid));
1084-
}
1085-
1086-
// Drop compute sinks.
1087-
for (cluster_id, ids) in by_cluster {
1088-
let compute = &mut self.controller.compute;
1089-
// A cluster could have been dropped, so verify it exists.
1090-
if compute.instance_exists(cluster_id) {
1091-
compute
1092-
.drop_collections(cluster_id, ids)
1093-
.unwrap_or_terminate("cannot fail to drop collections");
1094-
}
1095-
}
1096-
1097-
// Drop storage sources.
1098-
self.drop_sources(source_ids)
1099-
}
1100-
11011028
fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec<CatalogItemId>) {
11021029
let cloud_resource_controller = Arc::clone(self.cloud_resource_controller
11031030
.as_ref()
@@ -1553,6 +1480,7 @@ impl Coordinator {
15531480
| Op::AlterRetainHistory { .. }
15541481
| Op::AlterNetworkPolicy { .. }
15551482
| Op::AlterAddColumn { .. }
1483+
| Op::AlterMaterializedViewApplyReplacement { .. }
15561484
| Op::UpdatePrivilege { .. }
15571485
| Op::UpdateDefaultPrivilege { .. }
15581486
| Op::GrantRole { .. }

src/adapter/src/coord/peek.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -854,7 +854,7 @@ impl crate::coord::Coordinator {
854854
// If a dataflow was created, drop it once the peek command is sent.
855855
if let Some(index_id) = drop_dataflow {
856856
self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
857-
self.drop_indexes(vec![(compute_instance, index_id)]);
857+
self.drop_compute_collections(vec![(compute_instance, index_id)]);
858858
}
859859

860860
let persist_client = self.persist_client.clone();

src/adapter/src/coord/sequencer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,12 @@ impl Coordinator {
512512
let result = self.sequence_alter_table(&mut ctx, plan).await;
513513
ctx.retire(result);
514514
}
515+
Plan::AlterMaterializedViewApplyReplacement(plan) => {
516+
let result = self
517+
.sequence_alter_materialized_view_apply_replacement(&mut ctx, plan)
518+
.await;
519+
ctx.retire(result);
520+
}
515521
Plan::AlterNetworkPolicy(plan) => {
516522
let res = self
517523
.sequence_alter_network_policy(ctx.session(), plan)

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ use mz_sql::names::{
6161
Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
6262
SchemaSpecifier, SystemObjectId,
6363
};
64-
use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
64+
use mz_sql::plan::{
65+
AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
66+
StatementContext,
67+
};
6568
use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
6669
use mz_storage_types::sinks::StorageSinkDesc;
6770
use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceExport};
@@ -5204,6 +5207,57 @@ impl Coordinator {
52045207

52055208
Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
52065209
}
5210+
5211+
#[instrument]
5212+
pub(super) async fn sequence_alter_materialized_view_apply_replacement(
5213+
&mut self,
5214+
ctx: &mut ExecuteContext,
5215+
plan: AlterMaterializedViewApplyReplacementPlan,
5216+
) -> Result<ExecuteResponse, AdapterError> {
5217+
const ERROR_CONTEXT: &str = "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT";
5218+
5219+
let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan;
5220+
5221+
// TODO(alter-mv): Wait until there is overlap between the old MV's write frontier and the
5222+
// new MV's as-of, to ensure no times are skipped.
5223+
5224+
let Some(old) = self.catalog().get_entry(&id).materialized_view() else {
5225+
return Err(AdapterError::internal(
5226+
ERROR_CONTEXT,
5227+
"id must refer to a materialized view",
5228+
));
5229+
};
5230+
let Some(new) = self
5231+
.catalog()
5232+
.get_entry(&replacement_id)
5233+
.materialized_view()
5234+
else {
5235+
return Err(AdapterError::internal(
5236+
ERROR_CONTEXT,
5237+
"replacement_id must refer to a materialized view",
5238+
));
5239+
};
5240+
5241+
let old_cluster_id = old.cluster_id;
5242+
let new_cluster_id = new.cluster_id;
5243+
5244+
let old_gid = old.global_id_writes();
5245+
let new_gid = new.global_id_writes();
5246+
5247+
let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
5248+
5249+
self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
5250+
Box::pin(async move {
5251+
// Cut over the MV computation, by shutting down the old dataflow and allowing the
5252+
// new dataflow to start writing.
5253+
coord.drop_compute_collections(vec![(old_cluster_id, old_gid)]);
5254+
coord.allow_writes(new_cluster_id, new_gid);
5255+
})
5256+
})
5257+
.await?;
5258+
5259+
Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView))
5260+
}
52075261
}
52085262

52095263
#[derive(Debug)]

0 commit comments

Comments
 (0)