Skip to content

Commit 6fb0f85

Browse files
committed
adapter,sql: plan and sequence replacement MV creation
1 parent 057967c commit 6fb0f85

File tree

8 files changed

+69
-10
lines changed

8 files changed

+69
-10
lines changed

src/adapter/src/catalog/state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,7 @@ impl CatalogState {
13751375
desc,
13761376
resolved_ids,
13771377
dependencies,
1378+
replacement_target: materialized_view.replacement_target,
13781379
cluster_id: materialized_view.cluster_id,
13791380
non_null_assertions: materialized_view.non_null_assertions,
13801381
custom_logical_compaction_window: materialized_view.compaction_window,

src/adapter/src/catalog/transact.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1080,7 +1080,15 @@ impl Catalog {
10801080
storage_collections_to_create.insert(source.global_id());
10811081
}
10821082
CatalogItem::MaterializedView(mv) => {
1083-
storage_collections_to_create.insert(mv.global_id_writes());
1083+
let mv_gid = mv.global_id_writes();
1084+
if let Some(target_id) = mv.replacement_target {
1085+
let target_gid = state.get_entry(&target_id).latest_global_id();
1086+
let shard_id =
1087+
state.storage_metadata().get_collection_shard(target_gid)?;
1088+
storage_collections_to_register.insert(mv_gid, shard_id);
1089+
} else {
1090+
storage_collections_to_create.insert(mv_gid);
1091+
}
10841092
}
10851093
CatalogItem::ContinualTask(ct) => {
10861094
storage_collections_to_create.insert(ct.global_id());

src/adapter/src/continual_task.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub fn ct_item_from_plan(
3939
expr: mut raw_expr,
4040
dependencies,
4141
column_names: _,
42+
replacement_target: _,
4243
non_null_assertions: _,
4344
compaction_window: _,
4445
refresh_schedule: _,

src/adapter/src/coord.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2101,7 +2101,12 @@ impl Coordinator {
21012101
}
21022102

21032103
self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2104-
self.allow_writes(mview.cluster_id, mview.global_id_writes());
2104+
2105+
// If this is a replacement MV, it must remain read-only until the replacement
2106+
// gets applied.
2107+
if mview.replacement_target.is_none() {
2108+
self.allow_writes(mview.cluster_id, mview.global_id_writes());
2109+
}
21052110
}
21062111
CatalogItem::Sink(sink) => {
21072112
policies_to_set

src/adapter/src/coord/sequencer/inner/create_materialized_view.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,7 @@ impl Coordinator {
561561
mut create_sql,
562562
expr: raw_expr,
563563
dependencies,
564+
replacement_target,
564565
cluster_id,
565566
non_null_assertions,
566567
compaction_window,
@@ -647,6 +648,7 @@ impl Coordinator {
647648
collections,
648649
resolved_ids,
649650
dependencies,
651+
replacement_target,
650652
cluster_id,
651653
non_null_assertions,
652654
custom_logical_compaction_window: compaction_window,
@@ -687,17 +689,26 @@ impl Coordinator {
687689

688690
let storage_metadata = coord.catalog.state().storage_metadata();
689691

692+
let mut collection_desc =
693+
CollectionDescription::for_other(output_desc, Some(storage_as_of));
694+
let mut allow_writes = true;
695+
696+
// If this MV is intended to replace another one, we need to start it in
697+
// read-only mode, targeting the shard of the replacement target.
698+
if let Some(target_id) = replacement_target {
699+
let target_gid = coord.catalog.get_entry(&target_id).latest_global_id();
700+
collection_desc.primary = Some(target_gid);
701+
allow_writes = false;
702+
}
703+
690704
// Announce the creation of the materialized view source.
691705
coord
692706
.controller
693707
.storage
694708
.create_collections(
695709
storage_metadata,
696710
None,
697-
vec![(
698-
global_id,
699-
CollectionDescription::for_other(output_desc, Some(storage_as_of)),
700-
)],
711+
vec![(global_id, collection_desc)],
701712
)
702713
.await
703714
.unwrap_or_terminate("cannot fail to append");
@@ -716,7 +727,10 @@ impl Coordinator {
716727
notice_builtin_updates_fut,
717728
)
718729
.await;
719-
coord.allow_writes(cluster_id, global_id);
730+
731+
if allow_writes {
732+
coord.allow_writes(cluster_id, global_id);
733+
}
720734
})
721735
})
722736
.await;

src/catalog/src/memory/objects.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1389,6 +1389,8 @@ pub struct MaterializedView {
13891389
pub resolved_ids: ResolvedIds,
13901390
/// All of the catalog objects that are referenced by this view.
13911391
pub dependencies: DependencyIds,
1392+
/// ID of the materialized view this materialized view is intended to replace.
1393+
pub replacement_target: Option<CatalogItemId>,
13921394
/// Cluster that this materialized view runs on.
13931395
pub cluster_id: ClusterId,
13941396
/// Column indexes that we assert are not `NULL`.

src/sql/src/plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1813,6 +1813,7 @@ pub struct MaterializedView {
18131813
pub dependencies: DependencyIds,
18141814
/// Columns of this view.
18151815
pub column_names: Vec<ColumnName>,
1816+
pub replacement_target: Option<CatalogItemId>,
18161817
/// Cluster this materialized view will get installed on.
18171818
pub cluster_id: ClusterId,
18181819
pub non_null_assertions: Vec<usize>,

src/sql/src/plan/statement/ddl.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ use crate::catalog::{
130130
use crate::iceberg::IcebergSinkConfigOptionExtracted;
131131
use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132132
use crate::names::{
133-
Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
133+
Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134134
ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135135
ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136136
};
@@ -2720,6 +2720,27 @@ pub fn plan_create_materialized_view(
27202720
scx: &StatementContext,
27212721
mut stmt: CreateMaterializedViewStatement<Aug>,
27222722
) -> Result<Plan, PlanError> {
2723+
// Validate the replacement target, if one is given.
2724+
let replacement_target = if let Some(target_name) = &stmt.replacing {
2725+
let target = scx.get_item_by_resolved_name(target_name)?;
2726+
if target.item_type() != CatalogItemType::MaterializedView {
2727+
sql_bail!(
2728+
"cannot replace {} {} because it is not a materialized view",
2729+
target.item_type(),
2730+
scx.catalog.minimal_qualification(target.name()),
2731+
);
2732+
}
2733+
if target.id().is_system() {
2734+
sql_bail!(
2735+
"cannot replace {} because it is required by the database system",
2736+
scx.catalog.minimal_qualification(target.name()),
2737+
);
2738+
}
2739+
Some(target.id())
2740+
} else {
2741+
None
2742+
};
2743+
27232744
let cluster_id =
27242745
crate::plan::statement::resolve_cluster_for_materialized_view(scx.catalog, &stmt)?;
27252746
stmt.in_cluster = Some(ResolvedClusterName {
@@ -2962,12 +2983,16 @@ pub fn plan_create_materialized_view(
29622983
.collect()
29632984
})
29642985
.unwrap_or_default();
2965-
let dependencies = expr
2986+
let mut dependencies: BTreeSet<_> = expr
29662987
.depends_on()
29672988
.into_iter()
29682989
.map(|gid| scx.catalog.resolve_item_id(&gid))
29692990
.collect();
29702991

2992+
if let Some(id) = replacement_target {
2993+
dependencies.insert(id);
2994+
}
2995+
29712996
// Check for an object in the catalog with this same name
29722997
let full_name = scx.catalog.resolve_full_name(&name);
29732998
let partial_name = PartialItemName::from(full_name.clone());
@@ -2987,8 +3012,9 @@ pub fn plan_create_materialized_view(
29873012
materialized_view: MaterializedView {
29883013
create_sql,
29893014
expr,
2990-
dependencies,
3015+
dependencies: DependencyIds(dependencies),
29913016
column_names,
3017+
replacement_target,
29923018
cluster_id,
29933019
non_null_assertions,
29943020
compaction_window,
@@ -3238,6 +3264,7 @@ pub fn plan_create_continual_task(
32383264
expr,
32393265
dependencies,
32403266
column_names,
3267+
replacement_target: None,
32413268
cluster_id,
32423269
non_null_assertions: Vec::new(),
32433270
compaction_window: None,

0 commit comments

Comments
 (0)