diff --git a/src/adapter/src/catalog/apply.rs b/src/adapter/src/catalog/apply.rs index 57ba0352e0d9b..ff55556183dfb 100644 --- a/src/adapter/src/catalog/apply.rs +++ b/src/adapter/src/catalog/apply.rs @@ -1014,33 +1014,31 @@ impl CatalogState { item: name.clone(), }; let entry = match retractions.items.remove(&key) { - Some(mut retraction) => { + Some(retraction) => { assert_eq!(retraction.id, item.id); - // We only reparse the SQL if it's changed. Otherwise, we use the existing - // item. This is a performance optimization and not needed for correctness. - // This makes it difficult to use the `UpdateFrom` trait, but the structure - // is still the same as the trait. - if retraction.create_sql() != create_sql { - let item = self - .deserialize_item( - global_id, - &create_sql, - &extra_versions, - local_expression_cache, - Some(retraction.item), - ) - .unwrap_or_else(|e| { - panic!("{e:?}: invalid persisted SQL: {create_sql}") - }); - retraction.item = item; - } - retraction.id = id; - retraction.oid = oid; - retraction.name = name; - retraction.owner_id = owner_id; - retraction.privileges = PrivilegeMap::from_mz_acl_items(privileges); - retraction + let item = self + .deserialize_item( + global_id, + &create_sql, + &extra_versions, + local_expression_cache, + Some(retraction.item), + ) + .unwrap_or_else(|e| { + panic!("{e:?}: invalid persisted SQL: {create_sql}") + }); + + CatalogEntry { + item, + id, + oid, + name, + owner_id, + privileges: PrivilegeMap::from_mz_acl_items(privileges), + referenced_by: retraction.referenced_by, + used_by: retraction.used_by, + } } None => { let catalog_item = self diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs index 368a8f639c7be..99b8399b45b53 100644 --- a/src/adapter/src/catalog/state.rs +++ b/src/adapter/src/catalog/state.rs @@ -1375,6 +1375,7 @@ impl CatalogState { desc, resolved_ids, dependencies, + replacement_target: materialized_view.replacement_target, cluster_id: materialized_view.cluster_id, non_null_assertions: materialized_view.non_null_assertions, custom_logical_compaction_window: materialized_view.compaction_window, diff --git a/src/adapter/src/catalog/transact.rs b/src/adapter/src/catalog/transact.rs index 73f5debb7b18b..22f7a4b19a104 100644 --- a/src/adapter/src/catalog/transact.rs +++ b/src/adapter/src/catalog/transact.rs @@ -100,6 +100,10 @@ pub enum Op { typ: SqlColumnType, sql: RawDataType, }, + AlterMaterializedViewApplyReplacement { + id: CatalogItemId, + replacement_id: CatalogItemId, + }, CreateDatabase { name: String, owner_id: RoleId, @@ -769,6 +773,28 @@ impl Catalog { tx.update_item(id, new_entry.into())?; storage_collections_to_register.insert(new_global_id, shard_id); } + Op::AlterMaterializedViewApplyReplacement { id, replacement_id } => { + let mut new_entry = state.get_entry(&id).clone(); + let replacement = state.get_entry(&replacement_id); + + let CatalogItem::MaterializedView(mv) = &mut new_entry.item else { + return Err(AdapterError::internal( + "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT", + "id must refer to a materialized view", + )); + }; + let CatalogItem::MaterializedView(replacement_mv) = &replacement.item else { + return Err(AdapterError::internal( + "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT", + "replacement_id must refer to a materialized view", + )); + }; + + mv.apply_replacement(replacement_mv.clone()); + + tx.remove_item(replacement_id)?; + tx.update_item(id, new_entry.into())?; + } Op::CreateDatabase { name, owner_id } => { let database_owner_privileges = vec![rbac::owner_privilege( mz_sql::catalog::ObjectType::Database, @@ -1080,7 +1106,15 @@ impl Catalog { storage_collections_to_create.insert(source.global_id()); } CatalogItem::MaterializedView(mv) => { - storage_collections_to_create.insert(mv.global_id_writes()); + let mv_gid = mv.global_id_writes(); + if let Some(target_id) = mv.replacement_target { + let target_gid = state.get_entry(&target_id).latest_global_id(); + let shard_id = + state.storage_metadata().get_collection_shard(target_gid)?; + storage_collections_to_register.insert(mv_gid, shard_id); + } else { + storage_collections_to_create.insert(mv_gid); + } } CatalogItem::ContinualTask(ct) => { storage_collections_to_create.insert(ct.global_id()); diff --git a/src/adapter/src/command.rs b/src/adapter/src/command.rs index 32c62dc7742ed..c74b26733d8ad 100644 --- a/src/adapter/src/command.rs +++ b/src/adapter/src/command.rs @@ -644,6 +644,7 @@ impl ExecuteResponse { | AlterSource | AlterSink | AlterTableAddColumn + | AlterMaterializedViewApplyReplacement | AlterNetworkPolicy => &[AlteredObject], AlterDefaultPrivileges => &[AlteredDefaultPrivileges], AlterSetCluster => &[AlteredObject], diff --git a/src/adapter/src/continual_task.rs b/src/adapter/src/continual_task.rs index 193d2fb32948d..6f7d83b9bafa8 100644 --- a/src/adapter/src/continual_task.rs +++ b/src/adapter/src/continual_task.rs @@ -39,6 +39,7 @@ pub fn ct_item_from_plan( expr: mut raw_expr, dependencies, column_names: _, + replacement_target: _, non_null_assertions: _, compaction_window: _, refresh_schedule: _, diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 200bc1f061d60..fd3531b3b3475 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -2101,7 +2101,12 @@ impl Coordinator { } self.ship_dataflow(df_desc, mview.cluster_id, None).await; - self.allow_writes(mview.cluster_id, mview.global_id_writes()); + + // If this is a replacement MV, it must remain read-only until the replacement + // gets applied. + if mview.replacement_target.is_none() { + self.allow_writes(mview.cluster_id, mview.global_id_writes()); + } } CatalogItem::Sink(sink) => { policies_to_set @@ -2736,6 +2741,7 @@ impl Coordinator { since: None, status_collection_id, timeline: Some(timeline.clone()), + primary: None, } }; @@ -2766,10 +2772,9 @@ impl Coordinator { let next_version = version.bump(); let primary_collection = versions.get(&next_version).map(|(gid, _desc)| gid).copied(); - let collection_desc = CollectionDescription::for_table( - desc.clone(), - primary_collection, - ); + let mut collection_desc = + CollectionDescription::for_table(desc.clone()); + collection_desc.primary = primary_collection; (*gid, collection_desc) }); @@ -2808,13 +2813,8 @@ impl Coordinator { compute_collections.push((mv.global_id_writes(), mv.desc.latest())); } CatalogItem::ContinualTask(ct) => { - let collection_desc = CollectionDescription { - desc: ct.desc.clone(), - data_source: DataSource::Other, - since: ct.initial_as_of.clone(), - status_collection_id: None, - timeline: None, - }; + let collection_desc = + CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone()); if ct.global_id().is_system() && collection_desc.since.is_none() { // We need a non-0 since to make as_of selection work. Fill it in below with // the `bootstrap_builtin_continual_tasks` call, which can only be run after @@ -2859,6 +2859,7 @@ impl Coordinator { since: None, status_collection_id: None, timeline: None, + primary: None, }; collections.push((sink.global_id, collection_desc)); } diff --git a/src/adapter/src/coord/appends.rs b/src/adapter/src/coord/appends.rs index 048c06bfd4faf..040146d863b6c 100644 --- a/src/adapter/src/coord/appends.rs +++ b/src/adapter/src/coord/appends.rs @@ -1004,6 +1004,7 @@ pub(crate) fn waiting_on_startup_appends( | Plan::AlterRole(_) | Plan::AlterOwner(_) | Plan::AlterTableAddColumn(_) + | Plan::AlterMaterializedViewApplyReplacement(_) | Plan::Declare(_) | Plan::Fetch(_) | Plan::Close(_) diff --git a/src/adapter/src/coord/catalog_serving.rs b/src/adapter/src/coord/catalog_serving.rs index ede6db25a3b22..d121b4f0012d9 100644 --- a/src/adapter/src/coord/catalog_serving.rs +++ b/src/adapter/src/coord/catalog_serving.rs @@ -128,6 +128,7 @@ pub fn auto_run_on_catalog_server<'a, 's, 'p>( | Plan::AlterRole(_) | Plan::AlterOwner(_) | Plan::AlterTableAddColumn(_) + | Plan::AlterMaterializedViewApplyReplacement(_) | Plan::Declare(_) | Plan::Fetch(_) | Plan::Close(_) diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index 549efa3cf444a..1fde57683d301 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -929,6 +929,7 @@ impl Coordinator { | Statement::AlterConnection(_) | Statement::AlterDefaultPrivileges(_) | Statement::AlterIndex(_) + | Statement::AlterMaterializedViewApplyReplacement(_) | Statement::AlterSetCluster(_) | Statement::AlterOwner(_) | Statement::AlterRetainHistory(_) @@ -1170,6 +1171,7 @@ impl Coordinator { if_exists: cmvs.if_exists, name: cmvs.name, columns: cmvs.columns, + replacing: cmvs.replacing, in_cluster: cmvs.in_cluster, query: cmvs.query, with_options: cmvs.with_options, diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 223595fd73324..88618b80416a6 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -214,9 +214,7 @@ impl Coordinator { let mut webhook_sources_to_restart = BTreeSet::new(); let mut table_gids_to_drop = vec![]; let mut storage_sink_gids_to_drop = vec![]; - let mut indexes_to_drop = vec![]; - let mut materialized_views_to_drop = vec![]; - let mut continual_tasks_to_drop = vec![]; + let mut compute_gids_to_drop = vec![]; let mut views_to_drop = vec![]; let mut replication_slots_to_drop: Vec<(PostgresConnection, String)> = vec![]; let mut secrets_to_drop = vec![]; @@ -275,21 +273,21 @@ impl Coordinator { storage_sink_gids_to_drop.push(sink.global_id()); } CatalogItem::Index(index) => { - indexes_to_drop.push((index.cluster_id, index.global_id())); + compute_gids_to_drop + .push((index.cluster_id, index.global_id())); } CatalogItem::MaterializedView(mv) => { - materialized_views_to_drop + compute_gids_to_drop .push((mv.cluster_id, mv.global_id_writes())); + sources_to_drop + .extend(mv.global_ids().map(|gid| (*id, gid))); } CatalogItem::View(view) => { views_to_drop.push((*id, view.clone())) } CatalogItem::ContinualTask(ct) => { - continual_tasks_to_drop.push(( - *id, - ct.cluster_id, - ct.global_id(), - )); + compute_gids_to_drop.push((ct.cluster_id, ct.global_id())); + sources_to_drop.push((*id, ct.global_id())); } CatalogItem::Secret(_) => { secrets_to_drop.push(*id); @@ -422,9 +420,7 @@ impl Coordinator { .map(|(_, gid)| *gid) .chain(table_gids_to_drop.iter().map(|(_, gid)| *gid)) .chain(storage_sink_gids_to_drop.iter().copied()) - .chain(indexes_to_drop.iter().map(|(_, gid)| *gid)) - .chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid)) - .chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid)) + .chain(compute_gids_to_drop.iter().map(|(_, gid)| *gid)) .chain(views_to_drop.iter().map(|(_id, view)| view.global_id())) .collect(); @@ -494,30 +490,19 @@ impl Coordinator { } } - let storage_ids_to_drop = sources_to_drop + let storage_gids_to_drop = sources_to_drop .iter() .map(|(_, gid)| *gid) .chain(storage_sink_gids_to_drop.iter().copied()) - .chain(table_gids_to_drop.iter().map(|(_, gid)| *gid)) - .chain(materialized_views_to_drop.iter().map(|(_, gid)| *gid)) - .chain(continual_tasks_to_drop.iter().map(|(_, _, gid)| *gid)); - let compute_ids_to_drop = indexes_to_drop - .iter() - .copied() - .chain(materialized_views_to_drop.iter().copied()) - .chain( - continual_tasks_to_drop - .iter() - .map(|(_, cluster_id, gid)| (*cluster_id, *gid)), - ); + .chain(table_gids_to_drop.iter().map(|(_, gid)| *gid)); // Check if any Timelines would become empty, if we dropped the specified storage or // compute resources. // // Note: only after a Transaction succeeds do we actually drop the timeline let collection_id_bundle = self.build_collection_id_bundle( - storage_ids_to_drop, - compute_ids_to_drop, + storage_gids_to_drop, + compute_gids_to_drop.clone(), clusters_to_drop.clone(), ); let timeline_associations: BTreeMap<_, _> = self @@ -655,14 +640,8 @@ impl Coordinator { self.cancel_pending_copy(&conn_id); } } - if !indexes_to_drop.is_empty() { - self.drop_indexes(indexes_to_drop); - } - if !materialized_views_to_drop.is_empty() { - self.drop_materialized_views(materialized_views_to_drop); - } - if !continual_tasks_to_drop.is_empty() { - self.drop_continual_tasks(continual_tasks_to_drop); + if !compute_gids_to_drop.is_empty() { + self.drop_compute_collections(compute_gids_to_drop); } if !vpc_endpoints_to_drop.is_empty() { self.drop_vpc_endpoints_in_background(vpc_endpoints_to_drop) @@ -1030,7 +1009,7 @@ impl Coordinator { .unwrap_or_terminate("cannot fail to drop sinks"); } - pub(crate) fn drop_indexes(&mut self, indexes: Vec<(ClusterId, GlobalId)>) { + pub(crate) fn drop_compute_collections(&mut self, indexes: Vec<(ClusterId, GlobalId)>) { let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new(); for (cluster_id, gid) in indexes { by_cluster.entry(cluster_id).or_default().push(gid); @@ -1046,58 +1025,6 @@ impl Coordinator { } } - /// A convenience method for dropping materialized views. - fn drop_materialized_views(&mut self, mviews: Vec<(ClusterId, GlobalId)>) { - let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new(); - let mut mv_gids = Vec::new(); - for (cluster_id, gid) in mviews { - by_cluster.entry(cluster_id).or_default().push(gid); - mv_gids.push(gid); - } - - // Drop compute sinks. - for (cluster_id, ids) in by_cluster { - let compute = &mut self.controller.compute; - // A cluster could have been dropped, so verify it exists. - if compute.instance_exists(cluster_id) { - compute - .drop_collections(cluster_id, ids) - .unwrap_or_terminate("cannot fail to drop collections"); - } - } - - // Drop storage resources. - let storage_metadata = self.catalog.state().storage_metadata(); - self.controller - .storage - .drop_sources(storage_metadata, mv_gids) - .unwrap_or_terminate("cannot fail to drop sources"); - } - - /// A convenience method for dropping continual tasks. - fn drop_continual_tasks(&mut self, cts: Vec<(CatalogItemId, ClusterId, GlobalId)>) { - let mut by_cluster: BTreeMap<_, Vec<_>> = BTreeMap::new(); - let mut source_ids = Vec::new(); - for (item_id, cluster_id, gid) in cts { - by_cluster.entry(cluster_id).or_default().push(gid); - source_ids.push((item_id, gid)); - } - - // Drop compute sinks. - for (cluster_id, ids) in by_cluster { - let compute = &mut self.controller.compute; - // A cluster could have been dropped, so verify it exists. - if compute.instance_exists(cluster_id) { - compute - .drop_collections(cluster_id, ids) - .unwrap_or_terminate("cannot fail to drop collections"); - } - } - - // Drop storage sources. - self.drop_sources(source_ids) - } - fn drop_vpc_endpoints_in_background(&self, vpc_endpoints: Vec) { let cloud_resource_controller = Arc::clone(self.cloud_resource_controller .as_ref() @@ -1308,6 +1235,7 @@ impl Coordinator { since: None, status_collection_id: None, timeline: None, + primary: None, }; let collections = vec![(id, collection_desc)]; @@ -1552,6 +1480,7 @@ impl Coordinator { | Op::AlterRetainHistory { .. } | Op::AlterNetworkPolicy { .. } | Op::AlterAddColumn { .. } + | Op::AlterMaterializedViewApplyReplacement { .. } | Op::UpdatePrivilege { .. } | Op::UpdateDefaultPrivilege { .. } | Op::GrantRole { .. } diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index 77ff54cea015c..68481a38c8001 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -854,7 +854,7 @@ impl crate::coord::Coordinator { // If a dataflow was created, drop it once the peek command is sent. if let Some(index_id) = drop_dataflow { self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]); - self.drop_indexes(vec![(compute_instance, index_id)]); + self.drop_compute_collections(vec![(compute_instance, index_id)]); } let persist_client = self.persist_client.clone(); diff --git a/src/adapter/src/coord/sequencer.rs b/src/adapter/src/coord/sequencer.rs index 046b5444b488b..4a0acd549308a 100644 --- a/src/adapter/src/coord/sequencer.rs +++ b/src/adapter/src/coord/sequencer.rs @@ -512,6 +512,12 @@ impl Coordinator { let result = self.sequence_alter_table(&mut ctx, plan).await; ctx.retire(result); } + Plan::AlterMaterializedViewApplyReplacement(plan) => { + let result = self + .sequence_alter_materialized_view_apply_replacement(&mut ctx, plan) + .await; + ctx.retire(result); + } Plan::AlterNetworkPolicy(plan) => { let res = self .sequence_alter_network_policy(ctx.session(), plan) diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index ff844043eb442..26f62a367fe70 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -61,7 +61,10 @@ use mz_sql::names::{ Aug, ObjectId, QualifiedItemName, ResolvedDatabaseSpecifier, ResolvedIds, ResolvedItemName, SchemaSpecifier, SystemObjectId, }; -use mz_sql::plan::{ConnectionDetails, NetworkPolicyRule, StatementContext}; +use mz_sql::plan::{ + AlterMaterializedViewApplyReplacementPlan, ConnectionDetails, NetworkPolicyRule, + StatementContext, +}; use mz_sql::pure::{PurifiedSourceExport, generate_subsource_statements}; use mz_storage_types::sinks::StorageSinkDesc; use mz_storage_types::sources::{GenericSourceConnection, IngestionDescription, SourceExport}; @@ -772,6 +775,7 @@ impl Coordinator { timeline: Some(source.timeline), since: None, status_collection_id, + primary: None, }, )); } @@ -1229,8 +1233,7 @@ impl Coordinator { .desc .at_version(RelationVersionSelector::Specific(relation_version)); // We assert above we have a single version, and thus we are the primary. - let collection_desc = - CollectionDescription::for_table(relation_desc, None); + let collection_desc = CollectionDescription::for_table(relation_desc); let collections = vec![(global_id, collection_desc)]; let compaction_window = table @@ -1283,6 +1286,7 @@ impl Coordinator { since: None, status_collection_id, timeline: Some(timeline.clone()), + primary: None, }; let collections = vec![(global_id, collection_desc)]; @@ -1319,6 +1323,7 @@ impl Coordinator { since: None, status_collection_id: None, timeline: Some(timeline.clone()), + primary: None, }; let collections = vec![(global_id, collection_desc)]; let read_policies = coord @@ -4492,6 +4497,7 @@ impl Coordinator { since: None, status_collection_id, timeline: Some(source.timeline.clone()), + primary: None, }, )); @@ -5201,6 +5207,57 @@ impl Coordinator { Ok(ExecuteResponse::AlteredObject(ObjectType::Table)) } + + #[instrument] + pub(super) async fn sequence_alter_materialized_view_apply_replacement( + &mut self, + ctx: &mut ExecuteContext, + plan: AlterMaterializedViewApplyReplacementPlan, + ) -> Result { + const ERROR_CONTEXT: &str = "ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT"; + + let AlterMaterializedViewApplyReplacementPlan { id, replacement_id } = plan; + + // TODO(alter-mv): Wait until there is overlap between the old MV's write frontier and the + // new MV's as-of, to ensure no times are skipped. + + let Some(old) = self.catalog().get_entry(&id).materialized_view() else { + return Err(AdapterError::internal( + ERROR_CONTEXT, + "id must refer to a materialized view", + )); + }; + let Some(new) = self + .catalog() + .get_entry(&replacement_id) + .materialized_view() + else { + return Err(AdapterError::internal( + ERROR_CONTEXT, + "replacement_id must refer to a materialized view", + )); + }; + + let old_cluster_id = old.cluster_id; + let new_cluster_id = new.cluster_id; + + let old_gid = old.global_id_writes(); + let new_gid = new.global_id_writes(); + + let ops = vec![catalog::Op::AlterMaterializedViewApplyReplacement { id, replacement_id }]; + + self.catalog_transact_with_side_effects(Some(ctx), ops, move |coord, _ctx| { + Box::pin(async move { + // Cut over the MV computation, by shutting down the old dataflow and allowing the + // new dataflow to start writing. + coord.drop_compute_collections(vec![(old_cluster_id, old_gid)]); + coord.allow_writes(new_cluster_id, new_gid); + }) + }) + .await?; + + Ok(ExecuteResponse::AlteredObject(ObjectType::MaterializedView)) + } } #[derive(Debug)] diff --git a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs index 1b826cd44cfdb..26cbd864e2521 100644 --- a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs +++ b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs @@ -33,7 +33,7 @@ use mz_sql::plan; use mz_sql::session::metadata::SessionMetadata; use mz_sql_parser::ast::Statement; use mz_sql_parser::ast::display::AstDisplay; -use mz_storage_client::controller::{CollectionDescription, DataSource}; +use mz_storage_client::controller::CollectionDescription; use mz_transform::dataflow::DataflowMetainfo; use mz_transform::notice::OptimizerNotice; @@ -148,13 +148,7 @@ impl Coordinator { None, vec![( global_id, - CollectionDescription { - desc, - data_source: DataSource::Other, - since: Some(as_of), - status_collection_id: None, - timeline: None, - }, + CollectionDescription::for_other(desc, Some(as_of)), )], ) .await diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs index 333fecce291d7..7c0b47de3fe89 100644 --- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs @@ -29,7 +29,7 @@ use mz_sql::plan; use mz_sql::session::metadata::SessionMetadata; use mz_sql_parser::ast; use mz_sql_parser::ast::display::AstDisplay; -use mz_storage_client::controller::{CollectionDescription, DataSource}; +use mz_storage_client::controller::CollectionDescription; use std::collections::BTreeMap; use timely::progress::Antichain; use tracing::Span; @@ -561,6 +561,7 @@ impl Coordinator { mut create_sql, expr: raw_expr, dependencies, + replacement_target, cluster_id, non_null_assertions, compaction_window, @@ -577,6 +578,23 @@ impl Coordinator { global_lir_plan, .. } = stage; + + // Validate the replacement target, if one is given. + // TODO(alter-mv): Could we do this already in planning? + if let Some(target_id) = replacement_target { + let Some(target) = self.catalog().get_entry(&target_id).materialized_view() else { + return Err(AdapterError::internal( + "create materialized view", + "replacement target not a materialized view", + )); + }; + + // For now, we don't support schema evolution for materialized views. + if &target.desc.latest() != global_lir_plan.desc() { + return Err(AdapterError::Unstructured(anyhow!("incompatible schemas"))); + } + } + // Timestamp selection let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id); @@ -596,6 +614,10 @@ impl Coordinator { let (dataflow_as_of, storage_as_of, until) = self.select_timestamps(id_bundle, refresh_schedule.as_ref(), read_holds)?; + // TODO(alter-mv): If this is a replacement MV, ensure that `storage_as_of` >= the since of + // the target storage collection. Otherwise, we risk that the storage controller panics + // when we try to create a new storage collection backed by the same shard. + tracing::info!( dataflow_as_of = ?dataflow_as_of, storage_as_of = ?storage_as_of, @@ -647,6 +669,7 @@ impl Coordinator { collections, resolved_ids, dependencies, + replacement_target, cluster_id, non_null_assertions, custom_logical_compaction_window: compaction_window, @@ -687,6 +710,18 @@ impl Coordinator { let storage_metadata = coord.catalog.state().storage_metadata(); + let mut collection_desc = + CollectionDescription::for_other(output_desc, Some(storage_as_of)); + let mut allow_writes = true; + + // If this MV is intended to replace another one, we need to start it in + // read-only mode, targeting the shard of the replacement target. + if let Some(target_id) = replacement_target { + let target_gid = coord.catalog.get_entry(&target_id).latest_global_id(); + collection_desc.primary = Some(target_gid); + allow_writes = false; + } + // Announce the creation of the materialized view source. coord .controller @@ -694,16 +729,7 @@ impl Coordinator { .create_collections( storage_metadata, None, - vec![( - global_id, - CollectionDescription { - desc: output_desc, - data_source: DataSource::Other, - since: Some(storage_as_of), - status_collection_id: None, - timeline: None, - }, - )], + vec![(global_id, collection_desc)], ) .await .unwrap_or_terminate("cannot fail to append"); @@ -722,7 +748,10 @@ impl Coordinator { notice_builtin_updates_fut, ) .await; - coord.allow_writes(cluster_id, global_id); + + if allow_writes { + coord.allow_writes(cluster_id, global_id); + } }) }) .await; diff --git a/src/catalog/src/memory/objects.rs b/src/catalog/src/memory/objects.rs index 6a92d741b25fc..bbc5142d905f0 100644 --- a/src/catalog/src/memory/objects.rs +++ b/src/catalog/src/memory/objects.rs @@ -752,6 +752,10 @@ impl mz_sql::catalog::CatalogItem for CatalogCollectionEntry { self.entry.writable_table_details() } + fn replacement_target(&self) -> Option { + self.entry.replacement_target() + } + fn type_details(&self) -> Option<&CatalogTypeDetails> { self.entry.type_details() } @@ -1389,6 +1393,8 @@ pub struct MaterializedView { pub resolved_ids: ResolvedIds, /// All of the catalog objects that are referenced by this view. pub dependencies: DependencyIds, + /// ID of the materialized view this materialized view is intended to replace. + pub replacement_target: Option, /// Cluster that this materialized view runs on. pub cluster_id: ClusterId, /// Column indexes that we assert are not `NULL`. @@ -1444,6 +1450,67 @@ impl MaterializedView { self.desc .at_version(RelationVersionSelector::Specific(*version)) } + + /// Apply the given replacement materialized view to this [`MaterializedView`]. + pub fn apply_replacement(&mut self, replacement: Self) { + let target_id = replacement + .replacement_target + .expect("replacement has target"); + + fn parse(create_sql: &str) -> mz_sql::ast::CreateMaterializedViewStatement { + let res = mz_sql::parse::parse(create_sql).unwrap_or_else(|e| { + panic!("invalid create_sql persisted in catalog: {e}\n{create_sql}"); + }); + if let Statement::CreateMaterializedView(cmvs) = res.into_element().ast { + cmvs + } else { + panic!("invalid MV create_sql persisted in catalog\n{create_sql}"); + } + } + + let old_stmt = parse(&self.create_sql); + let rpl_stmt = parse(&replacement.create_sql); + let new_stmt = mz_sql::ast::CreateMaterializedViewStatement { + if_exists: old_stmt.if_exists, + name: old_stmt.name, + columns: rpl_stmt.columns, + replacing: None, + in_cluster: rpl_stmt.in_cluster, + query: rpl_stmt.query, + as_of: rpl_stmt.as_of, + with_options: rpl_stmt.with_options, + }; + let create_sql = new_stmt.to_ast_string_stable(); + + let mut collections = std::mem::take(&mut self.collections); + // Note: We can't use `self.desc.latest_version` here because a replacement doesn't + // necessary evolve the relation schema, so that version might be lower than the actual + // latest version. + let latest_version = collections.keys().max().expect("at least one version"); + let new_version = latest_version.bump(); + collections.insert(new_version, replacement.global_id_writes()); + + let mut resolved_ids = replacement.resolved_ids; + resolved_ids.remove_item(&target_id); + let mut dependencies = replacement.dependencies; + dependencies.0.remove(&target_id); + + *self = Self { + create_sql, + collections, + raw_expr: replacement.raw_expr, + optimized_expr: replacement.optimized_expr, + desc: replacement.desc, + resolved_ids, + dependencies, + replacement_target: None, + cluster_id: replacement.cluster_id, + non_null_assertions: replacement.non_null_assertions, + custom_logical_compaction_window: replacement.custom_logical_compaction_window, + refresh_schedule: replacement.refresh_schedule, + initial_as_of: replacement.initial_as_of, + }; + } } #[derive(Debug, Clone, Serialize)] @@ -3367,6 +3434,14 @@ impl mz_sql::catalog::CatalogItem for CatalogEntry { } } + fn replacement_target(&self) -> Option { + if let CatalogItem::MaterializedView(mv) = self.item() { + mv.replacement_target + } else { + None + } + } + fn type_details(&self) -> Option<&CatalogTypeDetails> { if let CatalogItem::Type(Type { details, .. }) = self.item() { Some(details) diff --git a/src/repr/src/relation.rs b/src/repr/src/relation.rs index fc7a788f6d679..ca5d4cbd118b5 100644 --- a/src/repr/src/relation.rs +++ b/src/repr/src/relation.rs @@ -1355,8 +1355,6 @@ impl VersionedRelationDesc { /// Returns this [`RelationDesc`] at the specified version. pub fn at_version(&self, version: RelationVersionSelector) -> RelationDesc { // Get all of the changes from the start, up to whatever version was requested. - // - // TODO(parkmycar): We should probably panic on unknown verisons? let up_to_version = match version { RelationVersionSelector::Latest => RelationVersion(u64::MAX), RelationVersionSelector::Specific(v) => v, diff --git a/src/sql-lexer/src/keywords.txt b/src/sql-lexer/src/keywords.txt index e126ccd581f50..37c14d4dd6136 100644 --- a/src/sql-lexer/src/keywords.txt +++ b/src/sql-lexer/src/keywords.txt @@ -40,6 +40,7 @@ Analysis Analyze And Any +Apply Arity Arn Arranged @@ -390,6 +391,8 @@ Rename Reoptimize Repeatable Replace +Replacement +Replacing Replan Replica Replicas diff --git a/src/sql-parser/src/ast/defs/statement.rs b/src/sql-parser/src/ast/defs/statement.rs index 22cb27da9ea26..c0f29d526732e 100644 --- a/src/sql-parser/src/ast/defs/statement.rs +++ b/src/sql-parser/src/ast/defs/statement.rs @@ -82,6 +82,7 @@ pub enum Statement { AlterNetworkPolicy(AlterNetworkPolicyStatement), AlterRole(AlterRoleStatement), AlterTableAddColumn(AlterTableAddColumnStatement), + AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementStatement), Discard(DiscardStatement), DropObjects(DropObjectsStatement), DropOwned(DropOwnedStatement), @@ -160,6 +161,7 @@ impl AstDisplay for Statement { Statement::AlterConnection(stmt) => f.write_node(stmt), Statement::AlterRole(stmt) => f.write_node(stmt), Statement::AlterTableAddColumn(stmt) => f.write_node(stmt), + Statement::AlterMaterializedViewApplyReplacement(stmt) => f.write_node(stmt), Statement::Discard(stmt) => f.write_node(stmt), Statement::DropObjects(stmt) => f.write_node(stmt), Statement::DropOwned(stmt) => f.write_node(stmt), @@ -241,6 +243,9 @@ pub fn statement_kind_label_value(kind: StatementKind) -> &'static str { StatementKind::AlterOwner => "alter_owner", StatementKind::AlterConnection => "alter_connection", StatementKind::AlterTableAddColumn => "alter_table", + StatementKind::AlterMaterializedViewApplyReplacement => { + "alter_materialized_view_apply_replacement" + } StatementKind::Discard => "discard", StatementKind::DropObjects => "drop_objects", StatementKind::DropOwned => "drop_owned", @@ -1382,6 +1387,7 @@ pub struct CreateMaterializedViewStatement { pub if_exists: IfExistsBehavior, pub name: UnresolvedItemName, pub columns: Vec, + pub replacing: Option, pub in_cluster: Option, pub query: Query, pub as_of: Option, @@ -1410,6 +1416,11 @@ impl AstDisplay for CreateMaterializedViewStatement { f.write_str(")"); } + if let Some(target) = &self.replacing { + f.write_str(" REPLACING "); + f.write_node(target); + } + if let Some(cluster) = &self.in_cluster { f.write_str(" IN CLUSTER "); f.write_node(cluster); @@ -3199,6 +3210,32 @@ impl AstDisplay for AlterTableAddColumnStatement { impl_display_t!(AlterTableAddColumnStatement); +/// `ALTER MATERIALIZED VIEW ... APPLY REPLACEMENT ...` +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct AlterMaterializedViewApplyReplacementStatement { + pub if_exists: bool, + pub name: UnresolvedItemName, + pub replacement_name: UnresolvedItemName, +} + +impl AstDisplay for AlterMaterializedViewApplyReplacementStatement { + fn fmt(&self, f: &mut AstFormatter) + where + W: fmt::Write, + { + f.write_str("ALTER MATERIALIZED VIEW "); + if self.if_exists { + f.write_str("IF EXISTS "); + } + f.write_node(&self.name); + + f.write_str(" APPLY REPLACEMENT "); + f.write_node(&self.replacement_name); + } +} + +impl_display!(AlterMaterializedViewApplyReplacementStatement); + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct DiscardStatement { pub target: DiscardTarget, diff --git a/src/sql-parser/src/parser.rs b/src/sql-parser/src/parser.rs index 32c71f4ac5f61..fc33322936675 100644 --- a/src/sql-parser/src/parser.rs +++ b/src/sql-parser/src/parser.rs @@ -3862,6 +3862,10 @@ impl<'a> Parser<'a> { let name = self.parse_item_name()?; let columns = self.parse_parenthesized_column_list(Optional)?; + let replacing = self + .parse_keyword(REPLACING) + .then(|| self.parse_raw_name()) + .transpose()?; let in_cluster = self.parse_optional_in_cluster()?; let with_options = if self.parse_keyword(WITH) { @@ -3882,6 +3886,7 @@ impl<'a> Parser<'a> { if_exists, name, columns, + replacing, in_cluster, query, as_of, @@ -6432,10 +6437,10 @@ impl<'a> Parser<'a> { ) -> Result, ParserStatementError> { let if_exists = self.parse_if_exists().map_no_statement_parser_err()?; let name = self.parse_item_name().map_no_statement_parser_err()?; - let keywords = if object_type == ObjectType::Table { - [SET, RENAME, OWNER, RESET, ADD].as_slice() - } else { - [SET, RENAME, OWNER, RESET].as_slice() + let keywords: &[_] = match object_type { + ObjectType::Table => &[SET, RENAME, OWNER, RESET, ADD], + ObjectType::MaterializedView => &[SET, RENAME, OWNER, RESET, APPLY], + _ => &[SET, RENAME, OWNER, RESET], }; let action = self @@ -6526,6 +6531,28 @@ impl<'a> Parser<'a> { }, )) } + APPLY => { + assert_eq!( + object_type, + ObjectType::MaterializedView, + "checked object_type above", + ); + + self.expect_keyword(REPLACEMENT) + .map_parser_err(StatementKind::AlterMaterializedViewApplyReplacement)?; + + let replacement_name = self + .parse_item_name() + .map_parser_err(StatementKind::AlterMaterializedViewApplyReplacement)?; + + Ok(Statement::AlterMaterializedViewApplyReplacement( + AlterMaterializedViewApplyReplacementStatement { + if_exists, + name, + replacement_name, + }, + )) + } _ => unreachable!(), } } diff --git a/src/sql-parser/tests/testdata/alter b/src/sql-parser/tests/testdata/alter index c2c9b4a20ef09..28b0304273740 100644 --- a/src/sql-parser/tests/testdata/alter +++ b/src/sql-parser/tests/testdata/alter @@ -131,3 +131,17 @@ ALTER TABLE IF EXISTS t1 ADD COLUMN IF NOT EXISTS bar text ALTER TABLE IF EXISTS t1 ADD COLUMN IF NOT EXISTS bar text => AlterTableAddColumn(AlterTableAddColumnStatement { if_exists: true, name: UnresolvedItemName([Ident("t1")]), if_col_not_exist: true, column_name: Ident("bar"), data_type: Other { name: Name(UnresolvedItemName([Ident("text")])), typ_mod: [] } }) + +parse-statement +ALTER MATERIALIZED VIEW mv APPLY REPLACEMENT rpl +---- +ALTER MATERIALIZED VIEW mv APPLY REPLACEMENT rpl +=> +AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementStatement { if_exists: false, name: UnresolvedItemName([Ident("mv")]), replacement_name: UnresolvedItemName([Ident("rpl")]) }) + +parse-statement +ALTER MATERIALIZED VIEW IF EXISTS mv APPLY REPLACEMENT rpl +---- +ALTER MATERIALIZED VIEW IF EXISTS mv APPLY REPLACEMENT rpl +=> +AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementStatement { if_exists: true, name: UnresolvedItemName([Ident("mv")]), replacement_name: UnresolvedItemName([Ident("rpl")]) }) diff --git a/src/sql-parser/tests/testdata/ddl b/src/sql-parser/tests/testdata/ddl index 3a900eb4cf19a..3577dadb35d26 100644 --- a/src/sql-parser/tests/testdata/ddl +++ b/src/sql-parser/tests/testdata/ddl @@ -423,56 +423,56 @@ CREATE MATERIALIZED VIEW myschema.myview AS SELECT foo FROM bar ---- CREATE MATERIALIZED VIEW myschema.myview AS SELECT foo FROM bar => -CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("myschema"), Ident("myview")]), columns: [], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Identifier([Ident("foo")]), alias: None }], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("bar")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("myschema"), Ident("myview")]), columns: [], replacing: None, in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Identifier([Ident("foo")]), alias: None }], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("bar")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) parse-statement CREATE OR REPLACE MATERIALIZED VIEW v AS SELECT 1 ---- CREATE OR REPLACE MATERIALIZED VIEW v AS SELECT 1 => -CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Replace, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Replace, name: UnresolvedItemName([Ident("v")]), columns: [], replacing: None, in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) parse-statement CREATE MATERIALIZED VIEW IF NOT EXISTS v AS SELECT 1 ---- CREATE MATERIALIZED VIEW IF NOT EXISTS v AS SELECT 1 => -CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Skip, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Skip, name: UnresolvedItemName([Ident("v")]), columns: [], replacing: None, in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) parse-statement CREATE MATERIALIZED VIEW v (has, cols) AS SELECT 1, 2 ---- CREATE MATERIALIZED VIEW v (has, cols) AS SELECT 1, 2 => -CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("has"), Ident("cols")], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }, Expr { expr: Value(Number("2")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("has"), Ident("cols")], replacing: None, in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }, Expr { expr: Value(Number("2")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) parse-statement CREATE MATERIALIZED VIEW v IN CLUSTER bar AS SELECT 1 ---- CREATE MATERIALIZED VIEW v IN CLUSTER bar AS SELECT 1 => -CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Unresolved(Ident("bar"))), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], replacing: None, in_cluster: Some(Unresolved(Ident("bar"))), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) parse-statement CREATE MATERIALIZED VIEW v IN CLUSTER [1] AS SELECT 1 ---- CREATE MATERIALIZED VIEW v IN CLUSTER [1] AS SELECT 1 => -CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], replacing: None, in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) parse-statement CREATE MATERIALIZED VIEW v (n) WITH (PARTITION BY (n)) AS SELECT 1 ---- CREATE MATERIALIZED VIEW v (n) WITH (PARTITION BY = (n)) AS SELECT 1 => -CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("n")], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: PartitionBy, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("n")]))])) }] }) +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("n")], replacing: None, in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("1")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: PartitionBy, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("n")]))])) }] }) parse-statement CREATE MATERIALIZED VIEW v (n, m) WITH (PARTITION BY (n, m)) AS SELECT (1, 2); ---- CREATE MATERIALIZED VIEW v (n, m) WITH (PARTITION BY = (n, m)) AS SELECT ROW(1, 2) => -CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("n"), Ident("m")], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Row { exprs: [Value(Number("1")), Value(Number("2"))] }, alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: PartitionBy, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("n")])), UnresolvedItemName(UnresolvedItemName([Ident("m")]))])) }] }) +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [Ident("n"), Ident("m")], replacing: None, in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Row { exprs: [Value(Number("1")), Value(Number("2"))] }, alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: PartitionBy, value: Some(Sequence([UnresolvedItemName(UnresolvedItemName([Ident("n")])), UnresolvedItemName(UnresolvedItemName([Ident("m")]))])) }] }) parse-statement @@ -480,14 +480,28 @@ CREATE MATERIALIZED VIEW v WITH (REFRESH EVERY '1 day', ASSERT NOT NULL x) AS SE ---- CREATE MATERIALIZED VIEW v WITH (REFRESH = EVERY '1 day', ASSERT NOT NULL = x) AS SELECT * FROM t => -CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Wildcard], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("t")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: Refresh, value: Some(Refresh(Every(RefreshEveryOptionValue { interval: IntervalValue { value: "1 day", precision_high: Year, precision_low: Second, fsec_max_precision: None }, aligned_to: None }))) }, MaterializedViewOption { name: AssertNotNull, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("x")]))) }] }) +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], replacing: None, in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Wildcard], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("t")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: Refresh, value: Some(Refresh(Every(RefreshEveryOptionValue { interval: IntervalValue { value: "1 day", precision_high: Year, precision_low: Second, fsec_max_precision: None }, aligned_to: None }))) }, MaterializedViewOption { name: AssertNotNull, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("x")]))) }] }) + +parse-statement +CREATE MATERIALIZED VIEW v REPLACING target AS SELECT * FROM t +---- +CREATE MATERIALIZED VIEW v REPLACING target AS SELECT * FROM t +=> +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], replacing: Some(Name(UnresolvedItemName([Ident("target")]))), in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Wildcard], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("t")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) + +parse-statement +CREATE MATERIALIZED VIEW v REPLACING [u1 AS a.b.c] AS SELECT * FROM t +---- +CREATE MATERIALIZED VIEW v REPLACING [u1 AS a.b.c] AS SELECT * FROM t +=> +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("v")]), columns: [], replacing: Some(Id("u1", UnresolvedItemName([Ident("a"), Ident("b"), Ident("c")]), None)), in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Wildcard], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("t")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }) parse-statement CREATE OR REPLACE MATERIALIZED VIEW v IN CLUSTER [1] WITH (REFRESH EVERY '1 day' ALIGNED TO '2023-12-11 11:00', ASSERT NOT NULL x, REFRESH AT mz_now(), REFRESH ON COMMIT, REFRESH = AT CREATION) AS SELECT * FROM t; ---- CREATE OR REPLACE MATERIALIZED VIEW v IN CLUSTER [1] WITH (REFRESH = EVERY '1 day' ALIGNED TO '2023-12-11 11:00', ASSERT NOT NULL = x, REFRESH = AT mz_now(), REFRESH = ON COMMIT, REFRESH = AT CREATION) AS SELECT * FROM t => -CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Replace, name: UnresolvedItemName([Ident("v")]), columns: [], in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Wildcard], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("t")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: Refresh, value: Some(Refresh(Every(RefreshEveryOptionValue { interval: IntervalValue { value: "1 day", precision_high: Year, precision_low: Second, fsec_max_precision: None }, aligned_to: Some(Value(String("2023-12-11 11:00"))) }))) }, MaterializedViewOption { name: AssertNotNull, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("x")]))) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(At(RefreshAtOptionValue { time: Function(Function { name: Name(UnresolvedItemName([Ident("mz_now")])), args: Args { args: [], order_by: [] }, filter: None, over: None, distinct: false }) }))) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(OnCommit)) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(AtCreation)) }] }) +CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Replace, name: UnresolvedItemName([Ident("v")]), columns: [], replacing: None, in_cluster: Some(Resolved("1")), query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Wildcard], from: [TableWithJoins { relation: Table { name: Name(UnresolvedItemName([Ident("t")])), alias: None }, joins: [] }], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [MaterializedViewOption { name: Refresh, value: Some(Refresh(Every(RefreshEveryOptionValue { interval: IntervalValue { value: "1 day", precision_high: Year, precision_low: Second, fsec_max_precision: None }, aligned_to: Some(Value(String("2023-12-11 11:00"))) }))) }, MaterializedViewOption { name: AssertNotNull, value: Some(UnresolvedItemName(UnresolvedItemName([Ident("x")]))) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(At(RefreshAtOptionValue { time: Function(Function { name: Name(UnresolvedItemName([Ident("mz_now")])), args: Args { args: [], order_by: [] }, filter: None, over: None, distinct: false }) }))) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(OnCommit)) }, MaterializedViewOption { name: Refresh, value: Some(Refresh(AtCreation)) }] }) parse-statement roundtrip CREATE OR REPLACE MATERIALIZED VIEW v WITH (ASSERT NOT NULL a, ASSERT NOT NULL = b, RETAIN HISTORY = FOR '1s') AS SELECT 1 diff --git a/src/sql-parser/tests/testdata/explain b/src/sql-parser/tests/testdata/explain index 5659924e32fa0..47a88d1d337cd 100644 --- a/src/sql-parser/tests/testdata/explain +++ b/src/sql-parser/tests/testdata/explain @@ -234,14 +234,14 @@ EXPLAIN WITH (humanized expressions) CREATE MATERIALIZED VIEW mv AS SELECT 665 ---- EXPLAIN WITH (HUMANIZED EXPRESSIONS) CREATE MATERIALIZED VIEW mv AS SELECT 665 => -ExplainPlan(ExplainPlanStatement { stage: None, with_options: [ExplainPlanOption { name: HumanizedExpressions, value: None }], format: None, explainee: CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("mv")]), columns: [], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("665")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }, false) }) +ExplainPlan(ExplainPlanStatement { stage: None, with_options: [ExplainPlanOption { name: HumanizedExpressions, value: None }], format: None, explainee: CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("mv")]), columns: [], replacing: None, in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("665")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }, false) }) parse-statement EXPLAIN BROKEN CREATE MATERIALIZED VIEW mv AS SELECT 665 ---- EXPLAIN BROKEN CREATE MATERIALIZED VIEW mv AS SELECT 665 => -ExplainPlan(ExplainPlanStatement { stage: None, with_options: [], format: None, explainee: CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("mv")]), columns: [], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("665")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }, true) }) +ExplainPlan(ExplainPlanStatement { stage: None, with_options: [], format: None, explainee: CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("mv")]), columns: [], replacing: None, in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("665")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }, true) }) parse-statement EXPLAIN BROKEN CREATE DEFAULT INDEX ON q1 @@ -318,7 +318,7 @@ EXPLAIN WITH (ARITY, EQUIVALENCES, HUMANIZED EXPRESSIONS) CREATE MATERIALIZED VI ---- EXPLAIN WITH (ARITY, EQUIVALENCES, HUMANIZED EXPRESSIONS) CREATE MATERIALIZED VIEW mv AS SELECT 665 => -ExplainPlan(ExplainPlanStatement { stage: None, with_options: [ExplainPlanOption { name: Arity, value: None }, ExplainPlanOption { name: Equivalences, value: None }, ExplainPlanOption { name: HumanizedExpressions, value: None }], format: None, explainee: CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("mv")]), columns: [], in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("665")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }, false) }) +ExplainPlan(ExplainPlanStatement { stage: None, with_options: [ExplainPlanOption { name: Arity, value: None }, ExplainPlanOption { name: Equivalences, value: None }, ExplainPlanOption { name: HumanizedExpressions, value: None }], format: None, explainee: CreateMaterializedView(CreateMaterializedViewStatement { if_exists: Error, name: UnresolvedItemName([Ident("mv")]), columns: [], replacing: None, in_cluster: None, query: Query { ctes: Simple([]), body: Select(Select { distinct: None, projection: [Expr { expr: Value(Number("665")), alias: None }], from: [], selection: None, group_by: [], having: None, qualify: None, options: [] }), order_by: [], limit: None, offset: None }, as_of: None, with_options: [] }, false) }) parse-statement EXPLAIN ANALYZE MEMORY FOR INDEX i AS SQL diff --git a/src/sql-pretty/src/doc.rs b/src/sql-pretty/src/doc.rs index 1c5fc205f44f0..34b61f14ef736 100644 --- a/src/sql-pretty/src/doc.rs +++ b/src/sql-pretty/src/doc.rs @@ -263,6 +263,12 @@ impl Pretty { ")", )); } + if let Some(target) = &v.replacing { + docs.push(RcDoc::text(format!( + "REPLACING {}", + target.to_ast_string_simple() + ))); + } if let Some(cluster) = &v.in_cluster { docs.push(RcDoc::text(format!( "IN CLUSTER {}", diff --git a/src/sql/src/ast/transform.rs b/src/sql/src/ast/transform.rs index 9d665b233cf30..c7ecfdba3655a 100644 --- a/src/sql/src/ast/transform.rs +++ b/src/sql/src/ast/transform.rs @@ -202,8 +202,17 @@ pub fn create_stmt_rename_refs( Statement::CreateView(CreateViewStatement { definition: ViewDefinition { query, .. }, .. - }) - | Statement::CreateMaterializedView(CreateMaterializedViewStatement { query, .. }) => { + }) => { + rewrite_query(from_name, to_item_name, query)?; + } + Statement::CreateMaterializedView(CreateMaterializedViewStatement { + replacing, + query, + .. + }) => { + if let Some(target) = replacing { + maybe_update_item_name(target.name_mut()); + } rewrite_query(from_name, to_item_name, query)?; } Statement::CreateSource(_) diff --git a/src/sql/src/catalog.rs b/src/sql/src/catalog.rs index 404eaa0cea063..e6333e179a3ab 100644 --- a/src/sql/src/catalog.rs +++ b/src/sql/src/catalog.rs @@ -836,6 +836,9 @@ pub trait CatalogItem { /// catalog item is a table that accepts writes. fn writable_table_details(&self) -> Option<&[Expr]>; + /// The item this catalog item replaces, if any. + fn replacement_target(&self) -> Option; + /// Returns the type information associated with the catalog item, if the /// catalog item is a type. fn type_details(&self) -> Option<&CatalogTypeDetails>; diff --git a/src/sql/src/normalize.rs b/src/sql/src/normalize.rs index 250b47077da60..0118a3909cf0a 100644 --- a/src/sql/src/normalize.rs +++ b/src/sql/src/normalize.rs @@ -403,6 +403,7 @@ pub fn create_statement( if_exists, name, columns: _, + replacing: _, in_cluster: _, query, with_options: _, diff --git a/src/sql/src/plan.rs b/src/sql/src/plan.rs index 799648108b609..cfae266975f0f 100644 --- a/src/sql/src/plan.rs +++ b/src/sql/src/plan.rs @@ -193,6 +193,7 @@ pub enum Plan { AlterRole(AlterRolePlan), AlterOwner(AlterOwnerPlan), AlterTableAddColumn(AlterTablePlan), + AlterMaterializedViewApplyReplacement(AlterMaterializedViewApplyReplacementPlan), AlterNetworkPolicy(AlterNetworkPolicyPlan), Declare(DeclarePlan), Fetch(FetchPlan), @@ -253,6 +254,10 @@ impl Plan { StatementKind::AlterTableAddColumn => { &[PlanKind::AlterNoop, PlanKind::AlterTableAddColumn] } + StatementKind::AlterMaterializedViewApplyReplacement => &[ + PlanKind::AlterNoop, + PlanKind::AlterMaterializedViewApplyReplacement, + ], StatementKind::Close => &[PlanKind::Close], StatementKind::Comment => &[PlanKind::Comment], StatementKind::Commit => &[PlanKind::CommitTransaction], @@ -445,6 +450,9 @@ impl Plan { ObjectType::NetworkPolicy => "alter network policy owner", }, Plan::AlterTableAddColumn(_) => "alter table add column", + Plan::AlterMaterializedViewApplyReplacement(_) => { + "alter materialized view apply replacement" + } Plan::Declare(_) => "declare", Plan::Fetch(_) => "fetch", Plan::Close(_) => "close", @@ -1324,6 +1332,12 @@ pub struct AlterTablePlan { pub raw_sql_type: RawDataType, } +#[derive(Debug)] +pub struct AlterMaterializedViewApplyReplacementPlan { + pub id: CatalogItemId, + pub replacement_id: CatalogItemId, +} + #[derive(Debug)] pub struct DeclarePlan { pub name: String, @@ -1812,6 +1826,7 @@ pub struct MaterializedView { pub dependencies: DependencyIds, /// Columns of this view. pub column_names: Vec, + pub replacement_target: Option, /// Cluster this materialized view will get installed on. pub cluster_id: ClusterId, pub non_null_assertions: Vec, diff --git a/src/sql/src/plan/error.rs b/src/sql/src/plan/error.rs index 45138f8465c3b..4fed4aad10590 100644 --- a/src/sql/src/plan/error.rs +++ b/src/sql/src/plan/error.rs @@ -298,6 +298,12 @@ pub enum PlanError { }, /// AS OF or UP TO should be an expression that is castable and simplifiable to a non-null mz_timestamp value. InvalidAsOfUpTo, + InvalidReplacement { + item_type: CatalogItemType, + item_name: PartialItemName, + replacement_type: CatalogItemType, + replacement_name: PartialItemName, + }, // TODO(benesch): eventually all errors should be structured. Unstructured(String), } @@ -830,7 +836,10 @@ impl fmt::Display for PlanError { write!(f, "cursor {} does not exist", name.quoted()) } Self::CopyFromTargetTableDropped { target_name: name } => write!(f, "COPY FROM's target table {} was dropped", name.quoted()), - Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value") + Self::InvalidAsOfUpTo => write!(f, "AS OF or UP TO should be castable to a (non-null) mz_timestamp value"), + Self::InvalidReplacement { item_type, item_name, replacement_type, replacement_name } => { + write!(f, "cannot replace {item_type} {item_name} with {replacement_type} {replacement_name}") + } } } } diff --git a/src/sql/src/plan/statement.rs b/src/sql/src/plan/statement.rs index a013fd153f074..ed4a701097e60 100644 --- a/src/sql/src/plan/statement.rs +++ b/src/sql/src/plan/statement.rs @@ -126,6 +126,9 @@ pub fn describe( Statement::AlterCluster(stmt) => ddl::describe_alter_cluster_set_options(&scx, stmt)?, Statement::AlterConnection(stmt) => ddl::describe_alter_connection(&scx, stmt)?, Statement::AlterIndex(stmt) => ddl::describe_alter_index_options(&scx, stmt)?, + Statement::AlterMaterializedViewApplyReplacement(stmt) => { + ddl::describe_alter_materialized_view_apply_replacement(&scx, stmt)? + } Statement::AlterObjectRename(stmt) => ddl::describe_alter_object_rename(&scx, stmt)?, Statement::AlterObjectSwap(stmt) => ddl::describe_alter_object_swap(&scx, stmt)?, Statement::AlterRetainHistory(stmt) => ddl::describe_alter_retain_history(&scx, stmt)?, @@ -322,6 +325,9 @@ pub fn plan( Statement::AlterCluster(stmt) => ddl::plan_alter_cluster(scx, stmt), Statement::AlterConnection(stmt) => ddl::plan_alter_connection(scx, stmt), Statement::AlterIndex(stmt) => ddl::plan_alter_index_options(scx, stmt), + Statement::AlterMaterializedViewApplyReplacement(stmt) => { + ddl::plan_alter_materialized_view_apply_replacement(scx, stmt) + } Statement::AlterObjectRename(stmt) => ddl::plan_alter_object_rename(scx, stmt), Statement::AlterObjectSwap(stmt) => ddl::plan_alter_object_swap(scx, stmt), Statement::AlterRetainHistory(stmt) => ddl::plan_alter_retain_history(scx, stmt), @@ -1049,6 +1055,7 @@ impl From<&Statement> for StatementClassifica Statement::AlterCluster(_) => DDL, Statement::AlterConnection(_) => DDL, Statement::AlterIndex(_) => DDL, + Statement::AlterMaterializedViewApplyReplacement(_) => DDL, Statement::AlterObjectRename(_) => DDL, Statement::AlterObjectSwap(_) => DDL, Statement::AlterNetworkPolicy(_) => DDL, diff --git a/src/sql/src/plan/statement/ddl.rs b/src/sql/src/plan/statement/ddl.rs index e327b98bb148d..7c3a928604e29 100644 --- a/src/sql/src/plan/statement/ddl.rs +++ b/src/sql/src/plan/statement/ddl.rs @@ -45,42 +45,42 @@ use mz_repr::{ use mz_sql_parser::ast::{ self, AlterClusterAction, AlterClusterStatement, AlterConnectionAction, AlterConnectionOption, AlterConnectionOptionName, AlterConnectionStatement, AlterIndexAction, AlterIndexStatement, - AlterNetworkPolicyStatement, AlterObjectRenameStatement, AlterObjectSwapStatement, - AlterRetainHistoryStatement, AlterRoleOption, AlterRoleStatement, AlterSecretStatement, - AlterSetClusterStatement, AlterSinkAction, AlterSinkStatement, AlterSourceAction, - AlterSourceAddSubsourceOption, AlterSourceAddSubsourceOptionName, AlterSourceStatement, - AlterSystemResetAllStatement, AlterSystemResetStatement, AlterSystemSetStatement, - AlterTableAddColumnStatement, AvroSchema, AvroSchemaOption, AvroSchemaOptionName, - ClusterAlterOption, ClusterAlterOptionName, ClusterAlterOptionValue, - ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, ClusterFeature, - ClusterFeatureName, ClusterOption, ClusterOptionName, ClusterScheduleOptionValue, ColumnDef, - ColumnOption, CommentObjectType, CommentStatement, ConnectionOption, ConnectionOptionName, - ContinualTaskOption, ContinualTaskOptionName, CreateClusterReplicaStatement, - CreateClusterStatement, CreateConnectionOption, CreateConnectionOptionName, - CreateConnectionStatement, CreateConnectionType, CreateContinualTaskStatement, - CreateDatabaseStatement, CreateIndexStatement, CreateMaterializedViewStatement, - CreateNetworkPolicyStatement, CreateRoleStatement, CreateSchemaStatement, - CreateSecretStatement, CreateSinkConnection, CreateSinkOption, CreateSinkOptionName, - CreateSinkStatement, CreateSourceConnection, CreateSourceOption, CreateSourceOptionName, - CreateSourceStatement, CreateSubsourceOption, CreateSubsourceOptionName, - CreateSubsourceStatement, CreateTableFromSourceStatement, CreateTableStatement, CreateTypeAs, - CreateTypeListOption, CreateTypeListOptionName, CreateTypeMapOption, CreateTypeMapOptionName, - CreateTypeStatement, CreateViewStatement, CreateWebhookSourceStatement, CsrConfigOption, - CsrConfigOptionName, CsrConnection, CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, - CsvColumns, DeferredItemName, DocOnIdentifier, DocOnSchema, DropObjectsStatement, - DropOwnedStatement, Expr, Format, FormatSpecifier, IcebergSinkConfigOption, Ident, - IfExistsBehavior, IndexOption, IndexOptionName, KafkaSinkConfigOption, KeyConstraint, - LoadGeneratorOption, LoadGeneratorOptionName, MaterializedViewOption, - MaterializedViewOptionName, MySqlConfigOption, MySqlConfigOptionName, NetworkPolicyOption, - NetworkPolicyOptionName, NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, - NetworkPolicyRuleOptionName, PgConfigOption, PgConfigOptionName, ProtobufSchema, - QualifiedReplica, RefreshAtOptionValue, RefreshEveryOptionValue, RefreshOptionValue, - ReplicaDefinition, ReplicaOption, ReplicaOptionName, RoleAttribute, SetRoleVar, - SourceErrorPolicy, SourceIncludeMetadata, SqlServerConfigOption, SqlServerConfigOptionName, - Statement, TableConstraint, TableFromSourceColumns, TableFromSourceOption, - TableFromSourceOptionName, TableOption, TableOptionName, UnresolvedDatabaseName, - UnresolvedItemName, UnresolvedObjectName, UnresolvedSchemaName, Value, ViewDefinition, - WithOptionValue, + AlterMaterializedViewApplyReplacementStatement, AlterNetworkPolicyStatement, + AlterObjectRenameStatement, AlterObjectSwapStatement, AlterRetainHistoryStatement, + AlterRoleOption, AlterRoleStatement, AlterSecretStatement, AlterSetClusterStatement, + AlterSinkAction, AlterSinkStatement, AlterSourceAction, AlterSourceAddSubsourceOption, + AlterSourceAddSubsourceOptionName, AlterSourceStatement, AlterSystemResetAllStatement, + AlterSystemResetStatement, AlterSystemSetStatement, AlterTableAddColumnStatement, AvroSchema, + AvroSchemaOption, AvroSchemaOptionName, ClusterAlterOption, ClusterAlterOptionName, + ClusterAlterOptionValue, ClusterAlterUntilReadyOption, ClusterAlterUntilReadyOptionName, + ClusterFeature, ClusterFeatureName, ClusterOption, ClusterOptionName, + ClusterScheduleOptionValue, ColumnDef, ColumnOption, CommentObjectType, CommentStatement, + ConnectionOption, ConnectionOptionName, ContinualTaskOption, ContinualTaskOptionName, + CreateClusterReplicaStatement, CreateClusterStatement, CreateConnectionOption, + CreateConnectionOptionName, CreateConnectionStatement, CreateConnectionType, + CreateContinualTaskStatement, CreateDatabaseStatement, CreateIndexStatement, + CreateMaterializedViewStatement, CreateNetworkPolicyStatement, CreateRoleStatement, + CreateSchemaStatement, CreateSecretStatement, CreateSinkConnection, CreateSinkOption, + CreateSinkOptionName, CreateSinkStatement, CreateSourceConnection, CreateSourceOption, + CreateSourceOptionName, CreateSourceStatement, CreateSubsourceOption, + CreateSubsourceOptionName, CreateSubsourceStatement, CreateTableFromSourceStatement, + CreateTableStatement, CreateTypeAs, CreateTypeListOption, CreateTypeListOptionName, + CreateTypeMapOption, CreateTypeMapOptionName, CreateTypeStatement, CreateViewStatement, + CreateWebhookSourceStatement, CsrConfigOption, CsrConfigOptionName, CsrConnection, + CsrConnectionAvro, CsrConnectionProtobuf, CsrSeedProtobuf, CsvColumns, DeferredItemName, + DocOnIdentifier, DocOnSchema, DropObjectsStatement, DropOwnedStatement, Expr, Format, + FormatSpecifier, IcebergSinkConfigOption, Ident, IfExistsBehavior, IndexOption, + IndexOptionName, KafkaSinkConfigOption, KeyConstraint, LoadGeneratorOption, + LoadGeneratorOptionName, MaterializedViewOption, MaterializedViewOptionName, MySqlConfigOption, + MySqlConfigOptionName, NetworkPolicyOption, NetworkPolicyOptionName, + NetworkPolicyRuleDefinition, NetworkPolicyRuleOption, NetworkPolicyRuleOptionName, + PgConfigOption, PgConfigOptionName, ProtobufSchema, QualifiedReplica, RefreshAtOptionValue, + RefreshEveryOptionValue, RefreshOptionValue, ReplicaDefinition, ReplicaOption, + ReplicaOptionName, RoleAttribute, SetRoleVar, SourceErrorPolicy, SourceIncludeMetadata, + SqlServerConfigOption, SqlServerConfigOptionName, Statement, TableConstraint, + TableFromSourceColumns, TableFromSourceOption, TableFromSourceOptionName, TableOption, + TableOptionName, UnresolvedDatabaseName, UnresolvedItemName, UnresolvedObjectName, + UnresolvedSchemaName, Value, ViewDefinition, WithOptionValue, }; use mz_sql_parser::ident; use mz_sql_parser::parser::StatementParseResult; @@ -130,7 +130,7 @@ use crate::catalog::{ use crate::iceberg::IcebergSinkConfigOptionExtracted; use crate::kafka_util::{KafkaSinkConfigOptionExtracted, KafkaSourceConfigOptionExtracted}; use crate::names::{ - Aug, CommentObjectId, DatabaseId, ObjectId, PartialItemName, QualifiedItemName, + Aug, CommentObjectId, DatabaseId, DependencyIds, ObjectId, PartialItemName, QualifiedItemName, ResolvedClusterName, ResolvedColumnReference, ResolvedDataType, ResolvedDatabaseSpecifier, ResolvedItemName, ResolvedNetworkPolicyName, SchemaSpecifier, SystemObjectId, }; @@ -148,21 +148,21 @@ use crate::plan::with_options::{OptionalDuration, OptionalString, TryFromValue}; use crate::plan::{ AlterClusterPlan, AlterClusterPlanStrategy, AlterClusterRenamePlan, AlterClusterReplicaRenamePlan, AlterClusterSwapPlan, AlterConnectionPlan, AlterItemRenamePlan, - AlterNetworkPolicyPlan, AlterNoopPlan, AlterOptionParameter, AlterRetainHistoryPlan, - AlterRolePlan, AlterSchemaRenamePlan, AlterSchemaSwapPlan, AlterSecretPlan, - AlterSetClusterPlan, AlterSinkPlan, AlterSystemResetAllPlan, AlterSystemResetPlan, - AlterSystemSetPlan, AlterTablePlan, ClusterSchedule, CommentPlan, ComputeReplicaConfig, - ComputeReplicaIntrospectionConfig, ConnectionDetails, CreateClusterManagedPlan, - CreateClusterPlan, CreateClusterReplicaPlan, CreateClusterUnmanagedPlan, CreateClusterVariant, - CreateConnectionPlan, CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, - CreateMaterializedViewPlan, CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, - CreateSecretPlan, CreateSinkPlan, CreateSourcePlan, CreateTablePlan, CreateTypePlan, - CreateViewPlan, DataSourceDesc, DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, - MaterializedView, NetworkPolicyRule, NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, - PlanClusterOption, PlanNotice, PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, - Source, Table, TableDataSource, Type, VariableValue, View, WebhookBodyFormat, - WebhookHeaderFilters, WebhookHeaders, WebhookValidation, literal, plan_utils, query, - transform_ast, + AlterMaterializedViewApplyReplacementPlan, AlterNetworkPolicyPlan, AlterNoopPlan, + AlterOptionParameter, AlterRetainHistoryPlan, AlterRolePlan, AlterSchemaRenamePlan, + AlterSchemaSwapPlan, AlterSecretPlan, AlterSetClusterPlan, AlterSinkPlan, + AlterSystemResetAllPlan, AlterSystemResetPlan, AlterSystemSetPlan, AlterTablePlan, + ClusterSchedule, CommentPlan, ComputeReplicaConfig, ComputeReplicaIntrospectionConfig, + ConnectionDetails, CreateClusterManagedPlan, CreateClusterPlan, CreateClusterReplicaPlan, + CreateClusterUnmanagedPlan, CreateClusterVariant, CreateConnectionPlan, + CreateContinualTaskPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan, + CreateNetworkPolicyPlan, CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan, + CreateSourcePlan, CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc, + DropObjectsPlan, DropOwnedPlan, HirRelationExpr, Index, MaterializedView, NetworkPolicyRule, + NetworkPolicyRuleAction, NetworkPolicyRuleDirection, Plan, PlanClusterOption, PlanNotice, + PolicyAddress, QueryContext, ReplicaConfig, Secret, Sink, Source, Table, TableDataSource, Type, + VariableValue, View, WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders, + WebhookValidation, literal, plan_utils, query, transform_ast, }; use crate::session::vars::{ self, ENABLE_CLUSTER_SCHEDULE_REFRESH, ENABLE_COLLECTION_PARTITION_BY, @@ -2733,6 +2733,28 @@ pub fn plan_create_materialized_view( let partial_name = normalize::unresolved_item_name(stmt.name)?; let name = scx.allocate_qualified_name(partial_name.clone())?; + // Validate the replacement target, if one is given. + let replacement_target = if let Some(target_name) = &stmt.replacing { + let target = scx.get_item_by_resolved_name(target_name)?; + if target.item_type() != CatalogItemType::MaterializedView { + return Err(PlanError::InvalidReplacement { + item_type: target.item_type(), + item_name: scx.catalog.minimal_qualification(target.name()), + replacement_type: CatalogItemType::MaterializedView, + replacement_name: partial_name, + }); + } + if target.id().is_system() { + sql_bail!( + "cannot replace {} because it is required by the database system", + scx.catalog.minimal_qualification(target.name()), + ); + } + Some(target.id()) + } else { + None + }; + let query::PlannedRootQuery { expr, mut desc, @@ -2962,12 +2984,16 @@ pub fn plan_create_materialized_view( .collect() }) .unwrap_or_default(); - let dependencies = expr + let mut dependencies: BTreeSet<_> = expr .depends_on() .into_iter() .map(|gid| scx.catalog.resolve_item_id(&gid)) .collect(); + if let Some(id) = replacement_target { + dependencies.insert(id); + } + // Check for an object in the catalog with this same name let full_name = scx.catalog.resolve_full_name(&name); let partial_name = PartialItemName::from(full_name.clone()); @@ -2987,8 +3013,9 @@ pub fn plan_create_materialized_view( materialized_view: MaterializedView { create_sql, expr, - dependencies, + dependencies: DependencyIds(dependencies), column_names, + replacement_target, cluster_id, non_null_assertions, compaction_window, @@ -3238,6 +3265,7 @@ pub fn plan_create_continual_task( expr, dependencies, column_names, + replacement_target: None, cluster_id, non_null_assertions: Vec::new(), compaction_window: None, @@ -7313,6 +7341,52 @@ pub fn plan_alter_table_add_column( })) } +pub fn describe_alter_materialized_view_apply_replacement( + _: &StatementContext, + _: AlterMaterializedViewApplyReplacementStatement, +) -> Result { + Ok(StatementDesc::new(None)) +} + +pub fn plan_alter_materialized_view_apply_replacement( + scx: &StatementContext, + stmt: AlterMaterializedViewApplyReplacementStatement, +) -> Result { + let AlterMaterializedViewApplyReplacementStatement { + if_exists, + name, + replacement_name, + } = stmt; + + let object_type = ObjectType::MaterializedView; + let Some(mv) = resolve_item_or_type(scx, object_type, name.clone(), if_exists)? else { + scx.catalog.add_notice(PlanNotice::ObjectDoesNotExist { + name: name.to_ast_string_simple(), + object_type, + }); + return Ok(Plan::AlterNoop(AlterNoopPlan { object_type })); + }; + + let replacement = resolve_item_or_type(scx, object_type, replacement_name, false)? + .expect("if_exists not set"); + + if replacement.replacement_target() != Some(mv.id()) { + return Err(PlanError::InvalidReplacement { + item_type: mv.item_type(), + item_name: scx.catalog.minimal_qualification(mv.name()), + replacement_type: replacement.item_type(), + replacement_name: scx.catalog.minimal_qualification(replacement.name()), + }); + } + + Ok(Plan::AlterMaterializedViewApplyReplacement( + AlterMaterializedViewApplyReplacementPlan { + id: mv.id(), + replacement_id: replacement.id(), + }, + )) +} + pub fn describe_comment( _: &StatementContext, _: CommentStatement, diff --git a/src/sql/src/rbac.rs b/src/sql/src/rbac.rs index 07f8f2b85a9e9..28a41359bc65e 100644 --- a/src/sql/src/rbac.rs +++ b/src/sql/src/rbac.rs @@ -1264,6 +1264,13 @@ fn generate_rbac_requirements( item_usage: &CREATE_ITEM_USAGE, ..Default::default() }, + Plan::AlterMaterializedViewApplyReplacement( + plan::AlterMaterializedViewApplyReplacementPlan { id, replacement_id }, + ) => RbacRequirements { + ownership: vec![ObjectId::Item(*id), ObjectId::Item(*replacement_id)], + item_usage: &CREATE_ITEM_USAGE, + ..Default::default() + }, Plan::AlterNetworkPolicy(plan::AlterNetworkPolicyPlan { id, .. }) => RbacRequirements { ownership: vec![ObjectId::NetworkPolicy(*id)], item_usage: &CREATE_ITEM_USAGE, diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 03b9eaf0eb690..37e94d95371bb 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -124,12 +124,7 @@ pub enum DataSource { /// Data comes from external HTTP requests pushed to Materialize. Webhook, /// The adapter layer appends timestamped data, i.e. it is a `TABLE`. - Table { - /// This table has had columns added or dropped to it, so we're now a - /// "view" over the "primary" Table/collection. Within the - /// `storage-controller` we the primary as a dependency. - primary: Option, - }, + Table, /// This source's data does not need to be managed by the storage /// controller, e.g. it's a materialized view or the catalog collection. Other, @@ -151,6 +146,13 @@ pub struct CollectionDescription { pub status_collection_id: Option, /// The timeline of the source. Absent for materialized views, continual tasks, etc. pub timeline: Option, + /// The primary of this collections. + /// + /// Multiple storage collections can point to the same persist shard, + /// possibly with different schemas. In such a configuration, we select one + /// of the involved collections as the primary, who "owns" the persist + /// shard. All other involved collections have a dependency on the primary. + pub primary: Option, } impl CollectionDescription { @@ -162,17 +164,19 @@ impl CollectionDescription { since, status_collection_id: None, timeline: None, + primary: None, } } /// Create a CollectionDescription for a table. - pub fn for_table(desc: RelationDesc, primary: Option) -> Self { + pub fn for_table(desc: RelationDesc) -> Self { Self { desc, - data_source: DataSource::Table { primary }, + data_source: DataSource::Table, since: None, status_collection_id: None, timeline: Some(Timeline::EpochMilliseconds), + primary: None, } } } @@ -736,7 +740,7 @@ impl DataSource { /// source using txn-wal. pub fn in_txns(&self) -> bool { match self { - DataSource::Table { .. } => true, + DataSource::Table => true, DataSource::Other | DataSource::Ingestion(_) | DataSource::IngestionExport { .. } diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index aec2c3eccbbb1..d5858bd5b1497 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -902,24 +902,25 @@ where Ok(()) } - /// Determine if this collection has another dependency. - /// - /// Currently, collections have either 0 or 1 dependencies. + /// Returns the given collection's dependencies. fn determine_collection_dependencies( &self, self_collections: &BTreeMap>, source_id: GlobalId, - data_source: &DataSource, + collection_desc: &CollectionDescription, ) -> Result, StorageError> { - let dependencies = match &data_source { + let mut dependencies = Vec::new(); + + if let Some(id) = collection_desc.primary { + dependencies.push(id); + } + + match &collection_desc.data_source { DataSource::Introspection(_) | DataSource::Webhook - | DataSource::Table { primary: None } + | DataSource::Table | DataSource::Progress - | DataSource::Other => Vec::new(), - DataSource::Table { - primary: Some(primary), - } => vec![*primary], + | DataSource::Other => (), DataSource::IngestionExport { ingestion_id, data_config, @@ -935,20 +936,18 @@ where }; match data_config.envelope { - SourceEnvelope::CdcV2 => Vec::new(), - _ => vec![ingestion.remap_collection_id], + SourceEnvelope::CdcV2 => (), + _ => dependencies.push(ingestion.remap_collection_id), } } // Ingestions depend on their remap collection. DataSource::Ingestion(ingestion) => { - if ingestion.remap_collection_id == source_id { - vec![] - } else { - vec![ingestion.remap_collection_id] + if ingestion.remap_collection_id != source_id { + dependencies.push(ingestion.remap_collection_id); } } - DataSource::Sink { desc } => vec![desc.sink.from], - }; + DataSource::Sink { desc } => dependencies.push(desc.sink.from), + } Ok(dependencies) } @@ -1342,12 +1341,9 @@ where let mut persist_compaction_commands = Vec::with_capacity(collections_net.len()); for (key, (mut changes, frontier)) in collections_net { if !changes.is_empty() { - // If the table has a "primary" collection, let that collection drive compaction. + // If the collection has a "primary" collection, let that primary drive compaction. let collection = collections.get(&key).expect("must still exist"); - let should_emit_persist_compaction = !matches!( - collection.description.data_source, - DataSource::Table { primary: Some(_) } - ); + let should_emit_persist_compaction = collection.description.primary.is_none(); if frontier.is_empty() { info!(id = %key, "removing collection state because the since advanced to []!"); @@ -1906,7 +1902,7 @@ where | DataSource::Progress | DataSource::Other => {} DataSource::Sink { .. } => {} - DataSource::Table { .. } => { + DataSource::Table => { let register_ts = register_ts.expect( "caller should have provided a register_ts when creating a table", ); @@ -1963,7 +1959,7 @@ where Sink(GlobalId), } to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source { - DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)), + DataSource::Table => DependencyOrder::Table(Reverse(*id)), DataSource::Sink { .. } => DependencyOrder::Sink(*id), _ => DependencyOrder::Collection(*id), }); @@ -1977,11 +1973,8 @@ where let data_shard_since = since_handle.since().clone(); // Determine if this collection has any dependencies. - let storage_dependencies = self.determine_collection_dependencies( - &*self_collections, - id, - &description.data_source, - )?; + let storage_dependencies = + self.determine_collection_dependencies(&*self_collections, id, &description)?; // Determine the initial since of the collection. let initial_since = match storage_dependencies @@ -2088,7 +2081,7 @@ where self_collections.insert(id, collection_state); } - DataSource::Table { .. } => { + DataSource::Table => { // See comment on self.initial_txn_upper on why we're doing // this. if is_in_txns(id, &metadata) @@ -2265,7 +2258,7 @@ where .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?; // TODO(alter_table): Support changes to sources. - if !matches!(&existing.description.data_source, DataSource::Table { .. }) { + if existing.description.data_source != DataSource::Table { return Err(StorageError::IdentifierInvalid(existing_collection)); } @@ -2352,15 +2345,11 @@ where .expect("existing collection missing"); // A higher level should already be asserting this, but let's make sure. - assert!(matches!( - existing.description.data_source, - DataSource::Table { primary: None } - )); + assert_eq!(existing.description.data_source, DataSource::Table); + assert_none!(existing.description.primary); // The existing version of the table will depend on the new version. - existing.description.data_source = DataSource::Table { - primary: Some(new_collection), - }; + existing.description.primary = Some(new_collection); existing.storage_dependencies.push(new_collection); // Copy over the frontiers from the previous version. @@ -2378,8 +2367,8 @@ where let mut changes = ChangeBatch::new(); changes.extend(implied_capability.iter().map(|t| (t.clone(), 1))); - // Note: The new collection is now the "primary collection" so we specify `None` here. - let collection_desc = CollectionDescription::for_table(new_desc.clone(), None); + // Note: The new collection is now the "primary collection". + let collection_desc = CollectionDescription::for_table(new_desc.clone()); let collection_meta = CollectionMetadata { persist_location: self.persist_location.clone(), relation_desc: collection_desc.desc.clone(), diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 0776d4cb4ee31..5c736ed2d36f7 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -890,7 +890,7 @@ where // easier to reason about it this way. let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register .into_iter() - .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. })); + .partition(|(_id, desc, ..)| desc.data_source == DataSource::Table); let to_register = tables_to_register .into_iter() .rev() @@ -909,15 +909,13 @@ where && !(self.read_only && migrated_storage_collections.contains(&id)) }; - let data_source = description.data_source; - to_execute.insert(id); new_collections.insert(id); let write_frontier = write.upper(); // Determine if this collection has another dependency. - let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?; + let storage_dependencies = self.determine_collection_dependencies(id, &description)?; let dependency_read_holds = self .storage_collections @@ -929,6 +927,8 @@ where dependency_since.join_assign(read_hold.since()); } + let data_source = description.data_source; + // Assert some invariants. // // TODO(alter_table): Include Tables (is_in_txns) in this check. After @@ -1081,7 +1081,7 @@ where new_source_statistic_entries.insert(id); } - DataSource::Table { .. } => { + DataSource::Table => { debug!( ?data_source, meta = ?metadata, "registering {id} with persist table worker", @@ -1212,7 +1212,7 @@ where ), DataSource::Introspection(_) | DataSource::Webhook - | DataSource::Table { .. } + | DataSource::Table | DataSource::Progress | DataSource::Other => {} DataSource::Sink { .. } => { @@ -1388,7 +1388,7 @@ where let existing = collections .get(&existing_collection) .ok_or(StorageError::IdentifierMissing(existing_collection))?; - if !matches!(existing.data_source, DataSource::Table { .. }) { + if existing.data_source != DataSource::Table { return Err(StorageError::IdentifierInvalid(existing_collection)); } @@ -1419,8 +1419,6 @@ where ) .await; - // Note: The new collection is now the "primary collection" so we specify `None` here. - let collection_desc = CollectionDescription::::for_table(new_desc.clone(), None); let collection_meta = CollectionMetadata { persist_location: self.persist_location.clone(), data_shard, @@ -1431,7 +1429,7 @@ where // TODO(alter_table): Support schema evolution on sources. let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None); let collection_state = CollectionState::new( - collection_desc.data_source.clone(), + DataSource::Table, collection_meta, CollectionStateExtra::None, wallclock_lag_metrics, @@ -1440,17 +1438,6 @@ where // Great! We have successfully evolved the schema of our Table, now we need to update our // in-memory data structures. self.collections.insert(new_collection, collection_state); - let existing = self - .collections - .get_mut(&existing_collection) - .expect("missing existing collection"); - assert!(matches!( - existing.data_source, - DataSource::Table { primary: None } - )); - existing.data_source = DataSource::Table { - primary: Some(new_collection), - }; self.persist_table_worker .register(register_ts, vec![(new_collection, write_handle)]) @@ -1766,7 +1753,7 @@ where let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers .into_iter() .partition(|id| match self.collections[id].data_source { - DataSource::Table { .. } => true, + DataSource::Table => true, DataSource::IngestionExport { .. } | DataSource::Webhook => false, _ => panic!("identifier is not a table: {}", id), }); @@ -1884,7 +1871,7 @@ where ingestions_to_drop.insert(*id); source_statistics_to_drop.push(*id); } - DataSource::Progress | DataSource::Table { .. } | DataSource::Other => { + DataSource::Progress | DataSource::Table | DataSource::Other => { collections_to_drop.push(*id); } DataSource::Introspection(_) | DataSource::Sink { .. } => { @@ -3214,17 +3201,20 @@ where fn determine_collection_dependencies( &self, self_id: GlobalId, - data_source: &DataSource, + collection_desc: &CollectionDescription, ) -> Result, StorageError> { - let dependency = match &data_source { + let mut dependencies = Vec::new(); + + if let Some(id) = collection_desc.primary { + dependencies.push(id); + } + + match &collection_desc.data_source { DataSource::Introspection(_) | DataSource::Webhook - | DataSource::Table { primary: None } + | DataSource::Table | DataSource::Progress - | DataSource::Other => vec![], - DataSource::Table { - primary: Some(primary), - } => vec![*primary], + | DataSource::Other => (), DataSource::IngestionExport { ingestion_id, .. } => { // Ingestion exports depend on their primary source's remap // collection. @@ -3241,7 +3231,7 @@ where // and, 2) that the remap shard's since stays one step behind // their upper. Hence they track themselves and the remap shard // as dependencies. - vec![self_id, ingestion_remap_collection_id] + dependencies.extend([self_id, ingestion_remap_collection_id]); } // Ingestions depend on their remap collection. DataSource::Ingestion(ingestion) => { @@ -3249,19 +3239,18 @@ where // since stays one step behind the upper, and, 2) that the remap // shard's since stays one step behind their upper. Hence they // track themselves and the remap shard as dependencies. - let mut dependencies = vec![self_id]; + dependencies.push(self_id); if self_id != ingestion.remap_collection_id { dependencies.push(ingestion.remap_collection_id); } - dependencies } DataSource::Sink { desc } => { // Sinks hold back their own frontier and the frontier of their input. - vec![self_id, desc.sink.from] + dependencies.extend([self_id, desc.sink.from]); } }; - Ok(dependency) + Ok(dependencies) } async fn read_handle_for_snapshot( diff --git a/test/race-condition/mzcompose.py b/test/race-condition/mzcompose.py index 994e053a8bf92..7e5dbadc31ce3 100644 --- a/test/race-condition/mzcompose.py +++ b/test/race-condition/mzcompose.py @@ -665,6 +665,40 @@ def verify(self) -> str: raise NotImplementedError +class ReplacementMaterializedView(Object): + def create(self) -> str: + select = ( + "* FROM " + self.references.name + if self.references + else "'foo' AS a, 'bar' AS b" + ) + return f"> CREATE MATERIALIZED VIEW {self.name} AS SELECT {select}" + + def destroy(self) -> str: + return f"> DROP MATERIALIZED VIEW {self.name} CASCADE" + + def manipulate(self, kind: int) -> str: + select = ( + "* FROM " + self.references.name + if self.references + else "'foo' AS a, 'bar' AS b" + ) + manipulations = [ + lambda: "", + lambda: dedent( + f""" + > DROP MATERIALIZED VIEW IF EXISTS {self.name}_replacement + > CREATE MATERIALIZED VIEW {self.name}_replacement REPLACING {self.name} AS SELECT {select} + > ALTER MATERIALIZED VIEW {self.name} APPLY REPLACEMENT {self.name}_replacement + """ + ), + ] + return manipulations[kind % len(manipulations)]() + + def verify(self) -> str: + raise NotImplementedError + + class DefaultIndex(Object): can_refer: bool = False diff --git a/test/sqllogictest/replacement-materialized-views.slt b/test/sqllogictest/replacement-materialized-views.slt new file mode 100644 index 0000000000000..a1e7a46105cef --- /dev/null +++ b/test/sqllogictest/replacement-materialized-views.slt @@ -0,0 +1,149 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +mode cockroach + +# Setup + +statement ok +CREATE TABLE t (a int, b int) + +statement ok +INSERT INTO t VALUES (1, 2), (3, 4), (5, 6) + +statement ok +CREATE CLUSTER other REPLICAS (r1 (SIZE 'scale=1,workers=1'), r2 (SIZE 'scale=2,workers=2')) + + +# Test: basic replacement workflow + +statement ok +CREATE MATERIALIZED VIEW mv AS SELECT a, b FROM t + +query II +SELECT * FROM mv +---- +1 2 +3 4 +5 6 + +statement ok +CREATE MATERIALIZED VIEW rp REPLACING mv AS SELECT a + b as a, b FROM t + +query II +SELECT * FROM mv +---- +1 2 +3 4 +5 6 + +query TTT colnames,rowsort +SHOW MATERIALIZED VIEWS +---- +name cluster comment +mv quickstart (empty) +rp quickstart (empty) + +statement ok +ALTER MATERIALIZED VIEW mv APPLY REPLACEMENT rp + +query TTT colnames,rowsort +SHOW MATERIALIZED VIEWS +---- +name cluster comment +mv quickstart (empty) + +query II +SELECT * FROM mv +---- +3 2 +7 4 +11 6 + + +# Test: aborted replacement workflow + +statement ok +CREATE MATERIALIZED VIEW rp REPLACING mv AS SELECT -a as a, b FROM t + +query II +SELECT * FROM mv +---- +3 2 +7 4 +11 6 + +query TTT colnames,rowsort +SHOW MATERIALIZED VIEWS +---- +name cluster comment +mv quickstart (empty) +rp quickstart (empty) + +statement ok +DROP MATERIALIZED VIEW rp + +query TTT colnames,rowsort +SHOW MATERIALIZED VIEWS +---- +name cluster comment +mv quickstart (empty) + +query II +SELECT * FROM mv +---- +3 2 +7 4 +11 6 + + +# Test: schemas must match + +statement error db error: ERROR: incompatible schemas +CREATE MATERIALIZED VIEW rp REPLACING mv AS SELECT a, b, 1 FROM t + +# Test: replacement can be created in another cluster + +statement ok +CREATE MATERIALIZED VIEW rp REPLACING mv IN CLUSTER other AS SELECT a, b FROM t; + +query TTT colnames,rowsort +SHOW MATERIALIZED VIEWS +---- +name cluster comment +mv quickstart (empty) +rp other (empty) + +statement ok +ALTER MATERIALIZED VIEW mv APPLY REPLACEMENT rp + +query TTT colnames,rowsort +SHOW MATERIALIZED VIEWS +---- +name cluster comment +mv other (empty) + +# Test: replacement depends on target materialized view + +statement ok +CREATE MATERIALIZED VIEW rp REPLACING mv AS SELECT a, b FROM t + +statement error db error: ERROR: cannot drop materialized view "mv": still depended upon by materialized view "rp" +DROP MATERIALIZED VIEW mv; + +statement ok +DROP MATERIALIZED VIEW mv CASCADE; + +# TODO: Test: replacement cannot be selected from +# TODO: Test: not more than one replacement allowed per MV +# TODO: Test: cannot create an object depending on a replacement +# TODO: Test: replacing with the same SQL +# TODO: Test: audit log +# TODO: Test: error on wrong replacement type in ALTER MV +# TODO: Test: error on wrong target type in CREATE MV