Skip to content

Commit 8eea5b6

Browse files
committed
adapter,sql: plan and sequence replacement MV creation
1 parent 13a38c1 commit 8eea5b6

File tree

9 files changed

+101
-11
lines changed

9 files changed

+101
-11
lines changed

src/adapter/src/catalog/state.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,7 @@ impl CatalogState {
13751375
desc,
13761376
resolved_ids,
13771377
dependencies,
1378+
replacement_target: materialized_view.replacement_target,
13781379
cluster_id: materialized_view.cluster_id,
13791380
non_null_assertions: materialized_view.non_null_assertions,
13801381
custom_logical_compaction_window: materialized_view.compaction_window,

src/adapter/src/catalog/transact.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1080,7 +1080,15 @@ impl Catalog {
10801080
storage_collections_to_create.insert(source.global_id());
10811081
}
10821082
CatalogItem::MaterializedView(mv) => {
1083-
storage_collections_to_create.insert(mv.global_id_writes());
1083+
let mv_gid = mv.global_id_writes();
1084+
if let Some(target_id) = mv.replacement_target {
1085+
let target_gid = state.get_entry(&target_id).latest_global_id();
1086+
let shard_id =
1087+
state.storage_metadata().get_collection_shard(target_gid)?;
1088+
storage_collections_to_register.insert(mv_gid, shard_id);
1089+
} else {
1090+
storage_collections_to_create.insert(mv_gid);
1091+
}
10841092
}
10851093
CatalogItem::ContinualTask(ct) => {
10861094
storage_collections_to_create.insert(ct.global_id());

src/adapter/src/continual_task.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub fn ct_item_from_plan(
3939
expr: mut raw_expr,
4040
dependencies,
4141
column_names: _,
42+
replacement_target: _,
4243
non_null_assertions: _,
4344
compaction_window: _,
4445
refresh_schedule: _,

src/adapter/src/coord.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2101,7 +2101,12 @@ impl Coordinator {
21012101
}
21022102

21032103
self.ship_dataflow(df_desc, mview.cluster_id, None).await;
2104-
self.allow_writes(mview.cluster_id, mview.global_id_writes());
2104+
2105+
// If this is a replacement MV, it must remain read-only until the replacement
2106+
// gets applied.
2107+
if mview.replacement_target.is_none() {
2108+
self.allow_writes(mview.cluster_id, mview.global_id_writes());
2109+
}
21052110
}
21062111
CatalogItem::Sink(sink) => {
21072112
policies_to_set

src/adapter/src/coord/sequencer/inner/create_materialized_view.rs

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,7 @@ impl Coordinator {
561561
mut create_sql,
562562
expr: raw_expr,
563563
dependencies,
564+
replacement_target,
564565
cluster_id,
565566
non_null_assertions,
566567
compaction_window,
@@ -577,6 +578,23 @@ impl Coordinator {
577578
global_lir_plan,
578579
..
579580
} = stage;
581+
582+
// Validate the replacement target, if one is given.
583+
// TODO(alter-mv): Could we do this already in planning?
584+
if let Some(target_id) = replacement_target {
585+
let Some(target) = self.catalog().get_entry(&target_id).materialized_view() else {
586+
return Err(AdapterError::internal(
587+
"create materialized view",
588+
"replacement target not a materialized view",
589+
));
590+
};
591+
592+
// For now, we don't support schema evolution for materialized views.
593+
if &target.desc.latest() != global_lir_plan.desc() {
594+
return Err(AdapterError::Unstructured(anyhow!("incompatible schemas")));
595+
}
596+
}
597+
580598
// Timestamp selection
581599
let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);
582600

@@ -596,6 +614,10 @@ impl Coordinator {
596614
let (dataflow_as_of, storage_as_of, until) =
597615
self.select_timestamps(id_bundle, refresh_schedule.as_ref(), read_holds)?;
598616

617+
// TODO(alter-mv): If this is a replacement MV, ensure that `storage_as_of` >= the since of
618+
// the target storage collection. Otherwise, we risk that the storage controller panics
619+
// when we try to create a new storage collection backed by the same shard.
620+
599621
tracing::info!(
600622
dataflow_as_of = ?dataflow_as_of,
601623
storage_as_of = ?storage_as_of,
@@ -647,6 +669,7 @@ impl Coordinator {
647669
collections,
648670
resolved_ids,
649671
dependencies,
672+
replacement_target,
650673
cluster_id,
651674
non_null_assertions,
652675
custom_logical_compaction_window: compaction_window,
@@ -687,17 +710,26 @@ impl Coordinator {
687710

688711
let storage_metadata = coord.catalog.state().storage_metadata();
689712

713+
let mut collection_desc =
714+
CollectionDescription::for_other(output_desc, Some(storage_as_of));
715+
let mut allow_writes = true;
716+
717+
// If this MV is intended to replace another one, we need to start it in
718+
// read-only mode, targeting the shard of the replacement target.
719+
if let Some(target_id) = replacement_target {
720+
let target_gid = coord.catalog.get_entry(&target_id).latest_global_id();
721+
collection_desc.primary = Some(target_gid);
722+
allow_writes = false;
723+
}
724+
690725
// Announce the creation of the materialized view source.
691726
coord
692727
.controller
693728
.storage
694729
.create_collections(
695730
storage_metadata,
696731
None,
697-
vec![(
698-
global_id,
699-
CollectionDescription::for_other(output_desc, Some(storage_as_of)),
700-
)],
732+
vec![(global_id, collection_desc)],
701733
)
702734
.await
703735
.unwrap_or_terminate("cannot fail to append");
@@ -716,7 +748,10 @@ impl Coordinator {
716748
notice_builtin_updates_fut,
717749
)
718750
.await;
719-
coord.allow_writes(cluster_id, global_id);
751+
752+
if allow_writes {
753+
coord.allow_writes(cluster_id, global_id);
754+
}
720755
})
721756
})
722757
.await;

src/catalog/src/memory/objects.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1389,6 +1389,8 @@ pub struct MaterializedView {
13891389
pub resolved_ids: ResolvedIds,
13901390
/// All of the catalog objects that are referenced by this view.
13911391
pub dependencies: DependencyIds,
1392+
/// ID of the materialized view this materialized view is intended to replace.
1393+
pub replacement_target: Option<CatalogItemId>,
13921394
/// Cluster that this materialized view runs on.
13931395
pub cluster_id: ClusterId,
13941396
/// Column indexes that we assert are not `NULL`.

src/sql/src/plan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1813,6 +1813,7 @@ pub struct MaterializedView {
18131813
pub dependencies: DependencyIds,
18141814
/// Columns of this view.
18151815
pub column_names: Vec<ColumnName>,
1816+
pub replacement_target: Option<CatalogItemId>,
18161817
/// Cluster this materialized view will get installed on.
18171818
pub cluster_id: ClusterId,
18181819
pub non_null_assertions: Vec<usize>,

src/sql/src/plan/error.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,12 @@ pub enum PlanError {
298298
},
299299
/// AS OF or UP TO should be an expression that is castable and simplifiable to a non-null mz_timestamp value.
300300
InvalidAsOfUpTo,
301+
InvalidReplacement {
302+
item_type: CatalogItemType,
303+
item_name: PartialItemName,
304+
replacement_type: CatalogItemType,
305+
replacement_name: PartialItemName,
306+
},
301307
// TODO(benesch): eventually all errors should be structured.
302308
Unstructured(String),
303309
}
@@ -830,7 +836,10 @@ impl fmt::Display for PlanError {
830836
write!(f, "cursor {} does not exist", name.quoted())
831837
}
832838
Self::CopyFromTargetTableDropped { target_name: name } => write!(f, "COPY FROM's target table {} was dropped", name.quoted()),
833-
Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value")
839+
Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value"),
840+
Self::InvalidReplacement { item_type, item_name, replacement_type, replacement_name } => {
841+
write!(f, "cannot replace {item_type} {item_name} with {replacement_type} {replacement_name}")
842+
}
834843
}
835844
}
836845
}

src/sql/src/plan/statement/ddl.rs

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ use crate::catalog::{
130130
use crate::iceberg::IcebergSinkConfigOptionExtracted;
131131
use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted};
132132
use crate::names::{
133-
Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName,
133+
Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName,
134134
ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier,
135135
ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId,
136136
};
@@ -2733,6 +2733,28 @@ pub fn plan_create_materialized_view(
27332733
let partial_name = normalize::unresolved_item_name(stmt.name)?;
27342734
let name = scx.allocate_qualified_name(partial_name.clone())?;
27352735

2736+
// Validate the replacement target, if one is given.
2737+
let replacement_target = if let Some(target_name) = &stmt.replacing {
2738+
let target = scx.get_item_by_resolved_name(target_name)?;
2739+
if target.item_type() != CatalogItemType::MaterializedView {
2740+
return Err(PlanError::InvalidReplacement {
2741+
item_type: target.item_type(),
2742+
item_name: scx.catalog.minimal_qualification(target.name()),
2743+
replacement_type: CatalogItemType::MaterializedView,
2744+
replacement_name: partial_name,
2745+
});
2746+
}
2747+
if target.id().is_system() {
2748+
sql_bail!(
2749+
"cannot replace {} because it is required by the database system",
2750+
scx.catalog.minimal_qualification(target.name()),
2751+
);
2752+
}
2753+
Some(target.id())
2754+
} else {
2755+
None
2756+
};
2757+
27362758
let query::PlannedRootQuery {
27372759
expr,
27382760
mut desc,
@@ -2962,12 +2984,16 @@ pub fn plan_create_materialized_view(
29622984
.collect()
29632985
})
29642986
.unwrap_or_default();
2965-
let dependencies = expr
2987+
let mut dependencies: BTreeSet<_> = expr
29662988
.depends_on()
29672989
.into_iter()
29682990
.map(|gid| scx.catalog.resolve_item_id(&gid))
29692991
.collect();
29702992

2993+
if let Some(id) = replacement_target {
2994+
dependencies.insert(id);
2995+
}
2996+
29712997
// Check for an object in the catalog with this same name
29722998
let full_name = scx.catalog.resolve_full_name(&name);
29732999
let partial_name = PartialItemName::from(full_name.clone());
@@ -2987,8 +3013,9 @@ pub fn plan_create_materialized_view(
29873013
materialized_view: MaterializedView {
29883014
create_sql,
29893015
expr,
2990-
dependencies,
3016+
dependencies: DependencyIds(dependencies),
29913017
column_names,
3018+
replacement_target,
29923019
cluster_id,
29933020
non_null_assertions,
29943021
compaction_window,
@@ -3238,6 +3265,7 @@ pub fn plan_create_continual_task(
32383265
expr,
32393266
dependencies,
32403267
column_names,
3268+
replacement_target: None,
32413269
cluster_id,
32423270
non_null_assertions: Vec::new(),
32433271
compaction_window: None,

0 commit comments

Comments
 (0)