Skip to content

Commit 4849dad

Browse files
committed
adapter,sql: plan and sequence replacement MV application
1 parent 0d8de07 commit 4849dad

File tree

13 files changed

+240
-20
lines changed

13 files changed

+240
-20
lines changed

src/adapter/src/catalog/transact.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ pub enum Op {
100100
typ: SqlColumnType,
101101
sql: RawDataType,
102102
},
103+
AlterMaterializedViewApplyReplacement {
104+
id: CatalogItemId,
105+
replacement_id: CatalogItemId,
106+
},
103107
CreateDatabase {
104108
name: String,
105109
owner_id: RoleId,
@@ -769,6 +773,28 @@ impl Catalog {
769773
tx.update_item(id, new_entry.into())?;
770774
storage_collections_to_register.insert(new_global_id, shard_id);
771775
}
776+
Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => {
777+
let mut new_entry = state.get_entry(&id).clone();
778+
let replacement = state.get_entry(&replacement_id);
779+
780+
let CatalogItem::MaterializedView(mv) = &mut new_entry.item else {
781+
return Err(AdapterError::internal(
782+
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
783+
"id must refer to a materialized view",
784+
));
785+
};
786+
let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else {
787+
return Err(AdapterError::internal(
788+
"ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT",
789+
"replacement_id must refer to a materialized view",
790+
));
791+
};
792+
793+
mv.apply_replacement(replacement_mv.clone());
794+
795+
tx.remove_item(replacement_id)?;
796+
tx.update_item(id, new_entry.into())?;
797+
}
772798
Op::CreateDatabase { name, owner_id } => {
773799
let database_owner_privileges = vec![rbac::owner_privilege(
774800
mz_sql::catalog::ObjectType::Database,

src/adapter/src/command.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,7 @@ impl ExecuteResponse {
644644
| AlterSource
645645
| AlterSink
646646
| AlterTableAddColumn
647+
| AlterMaterializedViewApplyReplacement
647648
| AlterNetworkPolicy => &[AlteredObject],
648649
AlterDefaultPrivileges => &[AlteredDefaultPrivileges],
649650
AlterSetCluster => &[AlteredObject],

src/adapter/src/coord/appends.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,7 @@ pub(crate) fn waiting_on_startup_appends(
10041004
| Plan::AlterRole(_)
10051005
| Plan::AlterOwner(_)
10061006
| Plan::AlterTableAddColumn(_)
1007+
| Plan::AlterMaterializedViewApplyReplacement(_)
10071008
| Plan::Declare(_)
10081009
| Plan::Fetch(_)
10091010
| Plan::Close(_)

src/adapter/src/coord/catalog_serving.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>(
128128
| Plan::AlterRole(_)
129129
| Plan::AlterOwner(_)
130130
| Plan::AlterTableAddColumn(_)
131+
| Plan::AlterMaterializedViewApplyReplacement(_)
131132
| Plan::Declare(_)
132133
| Plan::Fetch(_)
133134
| Plan::Close(_)

src/adapter/src/coord/ddl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1553,6 +1553,7 @@ impl Coordinator {
15531553
| Op::AlterRetainHistory { .. }
15541554
| Op::AlterNetworkPolicy { .. }
15551555
| Op::AlterAddColumn { .. }
1556+
| Op::AlterMaterializedViewApplyReplacement { .. }
15561557
| Op::UpdatePrivilege { .. }
15571558
| Op::UpdateDefaultPrivilege { .. }
15581559
| Op::GrantRole { .. }

src/adapter/src/coord/sequencer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,12 @@ impl Coordinator {
512512
let result = self.sequence_alter_table(&mut ctx, plan).await;
513513
ctx.retire(result);
514514
}
515+
Plan::AlterMaterializedViewApplyReplacement(plan) => {
516+
let result = self
517+
.sequence_alter_materialized_view_apply_replacement(&mut ctx, plan)
518+
.await;
519+
ctx.retire(result);
520+
}
515521
Plan::AlterNetworkPolicy(plan) => {
516522
let res = self
517523
.sequence_alter_network_policy(ctx.session(), plan)

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,10 @@ use mz_sql::names::{
6161
Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName,
6262
SchemaSpecifier, SystemObjectId,
6363
};
64-
use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext};
64+
use mz_sql::plan::{
65+
AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule,
66+
StatementContext,
67+
};
6568
use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements};
6669
use mz_storage_types::sinks::StorageSinkDesc;
6770
use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceExport};
@@ -5204,6 +5207,61 @@ impl Coordinator {
52045207

52055208
Ok(ExecuteResponse::AlteredObject(ObjectType::Table))
52065209
}
5210+
5211+
#[instrument]
5212+
pub(super) async fn sequence_alter_materialized_view_apply_replacement(
5213+
&mut self,
5214+
ctx: &mut ExecuteContext,
5215+
plan: AlterMaterializedViewApplyReplacementPlan,
5216+
) -> Result<ExecuteResponse, AdapterError> {
5217+
const ERROR_CONTEXT: &str = "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT";
5218+
5219+
let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan;
5220+
5221+
// TODO(alter-mv): Wait until there is overlap between the old MV's write frontier and the
5222+
// new MV's as-of, to ensure no times are skipped.
5223+
5224+
let Some(old) = self.catalog().get_entry(&id).materialized_view() else {
5225+
return Err(AdapterError::internal(
5226+
ERROR_CONTEXT,
5227+
"id must refer to a materialized view",
5228+
));
5229+
};
5230+
let Some(new) = self
5231+
.catalog()
5232+
.get_entry(&replacement_id)
5233+
.materialized_view()
5234+
else {
5235+
return Err(AdapterError::internal(
5236+
ERROR_CONTEXT,
5237+
"replacement_id must refer to a materialized view",
5238+
));
5239+
};
5240+
5241+
let old_cluster_id = old.cluster_id;
5242+
let new_cluster_id = new.cluster_id;
5243+
5244+
let old_gid = old.global_id_writes();
5245+
let new_gid = new.global_id_writes();
5246+
5247+
let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }];
5248+
5249+
self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| {
5250+
Box::pin(async move {
5251+
// Cut over the MV computation, by shutting down the old dataflow and allowing the
5252+
// new dataflow to start writing.
5253+
coord
5254+
.controller
5255+
.compute
5256+
.drop_collections(old_cluster_id, vec![old_gid])
5257+
.unwrap_or_terminate("cannot fail to drop collections");
5258+
coord.allow_writes(new_cluster_id, new_gid);
5259+
})
5260+
})
5261+
.await?;
5262+
5263+
Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView))
5264+
}
52075265
}
52085266

52095267
#[derive(Debug)]

src/catalog/src/memory/objects.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,10 @@ impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry {
752752
self.entry.writable_table_details()
753753
}
754754

755+
fn replacement_target(&self) -> Option<CatalogItemId> {
756+
self.entry.replacement_target()
757+
}
758+
755759
fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
756760
self.entry.type_details()
757761
}
@@ -1446,6 +1450,67 @@ impl MaterializedView {
14461450
self.desc
14471451
.at_version(RelationVersionSelector::Specific(*version))
14481452
}
1453+
1454+
/// Apply the given replacement materialized view to this [`MaterializedView`].
1455+
pub fn apply_replacement(&mut self, replacement: Self) {
1456+
let target_id = replacement
1457+
.replacement_target
1458+
.expect("replacement has target");
1459+
1460+
fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement<Raw> {
1461+
let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| {
1462+
panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}");
1463+
});
1464+
if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast {
1465+
cmvs
1466+
} else {
1467+
panic!("invalid MV create_sql persisted in catalog\n{create_sql}");
1468+
}
1469+
}
1470+
1471+
let old_stmt = parse(&self.create_sql);
1472+
let rpl_stmt = parse(&replacement.create_sql);
1473+
let new_stmt = mz_sql::ast::CreateMaterializedViewStatement {
1474+
if_exists: old_stmt.if_exists,
1475+
name: old_stmt.name,
1476+
columns: rpl_stmt.columns,
1477+
replacing: None,
1478+
in_cluster: rpl_stmt.in_cluster,
1479+
query: rpl_stmt.query,
1480+
as_of: rpl_stmt.as_of,
1481+
with_options: rpl_stmt.with_options,
1482+
};
1483+
let create_sql = new_stmt.to_ast_string_stable();
1484+
1485+
let mut collections = std::mem::take(&mut self.collections);
1486+
// Note: We can't use `self.desc.latest_version` here because a replacement doesn't
1487+
// necessary evolve the relation schema, so that version might be lower than the actual
1488+
// latest version.
1489+
let latest_version = collections.keys().max().expect("at least one version");
1490+
let new_version = latest_version.bump();
1491+
collections.insert(new_version, replacement.global_id_writes());
1492+
1493+
let mut resolved_ids = replacement.resolved_ids;
1494+
resolved_ids.remove_item(&target_id);
1495+
let mut dependencies = replacement.dependencies;
1496+
dependencies.0.remove(&target_id);
1497+
1498+
*self = Self {
1499+
create_sql,
1500+
collections,
1501+
raw_expr: replacement.raw_expr,
1502+
optimized_expr: replacement.optimized_expr,
1503+
desc: replacement.desc,
1504+
resolved_ids,
1505+
dependencies,
1506+
replacement_target: None,
1507+
cluster_id: replacement.cluster_id,
1508+
non_null_assertions: replacement.non_null_assertions,
1509+
custom_logical_compaction_window: replacement.custom_logical_compaction_window,
1510+
refresh_schedule: replacement.refresh_schedule,
1511+
initial_as_of: replacement.initial_as_of,
1512+
};
1513+
}
14491514
}
14501515

14511516
#[derive(Debug, Clone, Serialize)]
@@ -3369,6 +3434,14 @@ impl mz_sql::catalog::CatalogItem for CatalogEntry {
33693434
}
33703435
}
33713436

3437+
fn replacement_target(&self) -> Option<CatalogItemId> {
3438+
if let CatalogItem::MaterializedView(mv) = self.item() {
3439+
mv.replacement_target
3440+
} else {
3441+
None
3442+
}
3443+
}
3444+
33723445
fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>> {
33733446
if let CatalogItem::Type(Type { details, .. }) = self.item() {
33743447
Some(details)

src/repr/src/relation.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1355,8 +1355,6 @@ impl VersionedRelationDesc {
13551355
/// Returns this [`RelationDesc`] at the specified version.
13561356
pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc {
13571357
// Get all of the changes from the start, up to whatever version was requested.
1358-
//
1359-
// TODO(parkmycar): We should probably panic on unknown verisons?
13601358
let up_to_version = match version {
13611359
RelationVersionSelector::Latest => RelationVersion(u64::MAX),
13621360
RelationVersionSelector::Specific(v) => v,

src/sql/src/catalog.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,9 @@ pub trait CatalogItem {
836836
/// catalog item is a table that accepts writes.
837837
fn writable_table_details(&self) -> Option<&[Expr<Aug>]>;
838838

839+
/// The item this catalog item replaces, if any.
840+
fn replacement_target(&self) -> Option<CatalogItemId>;
841+
839842
/// Returns the type information associated with the catalog item, if the
840843
/// catalog item is a type.
841844
fn type_details(&self) -> Option<&CatalogTypeDetails<IdReference>>;

0 commit comments

Comments
 (0)