From 51266fe2b7d8b18a0c817777aa46d12095dd7f90 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Thu, 20 Nov 2025 12:14:06 +0100 Subject: [PATCH] storage: support "view" collections for other data sources This commit changes the storage controller's data structures to support specifying a "view" storage collection, i.e., a storage collection that points to the same persist shard as another, for other data sources than `Table`. The intent is specifically to get support for `DataSource::Other`, in preparation for `ALTER MATERIALIZED VIEW`. Rather than adding a `primary` field to the `DataSource::Other` variant, this commit instead moves the `primary` field out of `DataSource::Table` and into `CollectionDescription`. This reflects the fact that the "primary" concept is independent of the data source, even though it's currently only used for tables. It also removes some visual clutter. --- src/adapter/src/coord.rs | 18 ++--- src/adapter/src/coord/ddl.rs | 1 + src/adapter/src/coord/sequencer/inner.rs | 7 +- .../sequencer/inner/create_continual_task.rs | 10 +-- .../inner/create_materialized_view.rs | 10 +-- src/storage-client/src/controller.rs | 22 +++--- src/storage-client/src/storage_collections.rs | 71 ++++++++----------- src/storage-controller/src/lib.rs | 59 +++++++-------- 8 files changed, 84 insertions(+), 114 deletions(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 200bc1f061d60..4197a932a727b 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -2736,6 +2736,7 @@ impl Coordinator { since: None, status_collection_id, timeline: Some(timeline.clone()), + primary: None, } }; @@ -2766,10 +2767,9 @@ impl Coordinator { let next_version = version.bump(); let primary_collection = versions.get(&next_version).map(|(gid, _desc)| gid).copied(); - let collection_desc = CollectionDescription::for_table( - desc.clone(), - primary_collection, - ); + let mut collection_desc = + CollectionDescription::for_table(desc.clone()); + collection_desc.primary = primary_collection; (*gid, collection_desc) }); @@ -2808,13 +2808,8 @@ impl Coordinator { compute_collections.push((mv.global_id_writes(), mv.desc.latest())); } CatalogItem::ContinualTask(ct) => { - let collection_desc = CollectionDescription { - desc: ct.desc.clone(), - data_source: DataSource::Other, - since: ct.initial_as_of.clone(), - status_collection_id: None, - timeline: None, - }; + let collection_desc = + CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone()); if ct.global_id().is_system() && collection_desc.since.is_none() { // We need a non-0 since to make as_of selection work. Fill it in below with // the `bootstrap_builtin_continual_tasks` call, which can only be run after @@ -2859,6 +2854,7 @@ impl Coordinator { since: None, status_collection_id: None, timeline: None, + primary: None, }; collections.push((sink.global_id, collection_desc)); } diff --git a/src/adapter/src/coord/ddl.rs b/src/adapter/src/coord/ddl.rs index 223595fd73324..f350733ee69d1 100644 --- a/src/adapter/src/coord/ddl.rs +++ b/src/adapter/src/coord/ddl.rs @@ -1308,6 +1308,7 @@ impl Coordinator { since: None, status_collection_id: None, timeline: None, + primary: None, }; let collections = vec![(id, collection_desc)]; diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index ff844043eb442..338d6a861fe33 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -772,6 +772,7 @@ impl Coordinator { timeline: Some(source.timeline), since: None, status_collection_id, + primary: None, }, )); } @@ -1229,8 +1230,7 @@ impl Coordinator { .desc .at_version(RelationVersionSelector::Specific(relation_version)); // We assert above we have a single version, and thus we are the primary. - let collection_desc = - CollectionDescription::for_table(relation_desc, None); + let collection_desc = CollectionDescription::for_table(relation_desc); let collections = vec![(global_id, collection_desc)]; let compaction_window = table @@ -1283,6 +1283,7 @@ impl Coordinator { since: None, status_collection_id, timeline: Some(timeline.clone()), + primary: None, }; let collections = vec![(global_id, collection_desc)]; @@ -1319,6 +1320,7 @@ impl Coordinator { since: None, status_collection_id: None, timeline: Some(timeline.clone()), + primary: None, }; let collections = vec![(global_id, collection_desc)]; let read_policies = coord @@ -4492,6 +4494,7 @@ impl Coordinator { since: None, status_collection_id, timeline: Some(source.timeline.clone()), + primary: None, }, )); diff --git a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs index 1b826cd44cfdb..26cbd864e2521 100644 --- a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs +++ b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs @@ -33,7 +33,7 @@ use mz_sql::plan; use mz_sql::session::metadata::SessionMetadata; use mz_sql_parser::ast::Statement; use mz_sql_parser::ast::display::AstDisplay; -use mz_storage_client::controller::{CollectionDescription, DataSource}; +use mz_storage_client::controller::CollectionDescription; use mz_transform::dataflow::DataflowMetainfo; use mz_transform::notice::OptimizerNotice; @@ -148,13 +148,7 @@ impl Coordinator { None, vec![( global_id, - CollectionDescription { - desc, - data_source: DataSource::Other, - since: Some(as_of), - status_collection_id: None, - timeline: None, - }, + CollectionDescription::for_other(desc, Some(as_of)), )], ) .await diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs index 333fecce291d7..db236e25a462b 100644 --- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs @@ -29,7 +29,7 @@ use mz_sql::plan; use mz_sql::session::metadata::SessionMetadata; use mz_sql_parser::ast; use mz_sql_parser::ast::display::AstDisplay; -use mz_storage_client::controller::{CollectionDescription, DataSource}; +use mz_storage_client::controller::CollectionDescription; use std::collections::BTreeMap; use timely::progress::Antichain; use tracing::Span; @@ -696,13 +696,7 @@ impl Coordinator { None, vec![( global_id, - CollectionDescription { - desc: output_desc, - data_source: DataSource::Other, - since: Some(storage_as_of), - status_collection_id: None, - timeline: None, - }, + CollectionDescription::for_other(output_desc, Some(storage_as_of)), )], ) .await diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs index 03b9eaf0eb690..37e94d95371bb 100644 --- a/src/storage-client/src/controller.rs +++ b/src/storage-client/src/controller.rs @@ -124,12 +124,7 @@ pub enum DataSource { /// Data comes from external HTTP requests pushed to Materialize. Webhook, /// The adapter layer appends timestamped data, i.e. it is a `TABLE`. - Table { - /// This table has had columns added or dropped to it, so we're now a - /// "view" over the "primary" Table/collection. Within the - /// `storage-controller` we the primary as a dependency. - primary: Option, - }, + Table, /// This source's data does not need to be managed by the storage /// controller, e.g. it's a materialized view or the catalog collection. Other, @@ -151,6 +146,13 @@ pub struct CollectionDescription { pub status_collection_id: Option, /// The timeline of the source. Absent for materialized views, continual tasks, etc. pub timeline: Option, + /// The primary of this collections. + /// + /// Multiple storage collections can point to the same persist shard, + /// possibly with different schemas. In such a configuration, we select one + /// of the involved collections as the primary, who "owns" the persist + /// shard. All other involved collections have a dependency on the primary. + pub primary: Option, } impl CollectionDescription { @@ -162,17 +164,19 @@ impl CollectionDescription { since, status_collection_id: None, timeline: None, + primary: None, } } /// Create a CollectionDescription for a table. - pub fn for_table(desc: RelationDesc, primary: Option) -> Self { + pub fn for_table(desc: RelationDesc) -> Self { Self { desc, - data_source: DataSource::Table { primary }, + data_source: DataSource::Table, since: None, status_collection_id: None, timeline: Some(Timeline::EpochMilliseconds), + primary: None, } } } @@ -736,7 +740,7 @@ impl DataSource { /// source using txn-wal. pub fn in_txns(&self) -> bool { match self { - DataSource::Table { .. } => true, + DataSource::Table => true, DataSource::Other | DataSource::Ingestion(_) | DataSource::IngestionExport { .. } diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index aec2c3eccbbb1..d5858bd5b1497 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -902,24 +902,25 @@ where Ok(()) } - /// Determine if this collection has another dependency. - /// - /// Currently, collections have either 0 or 1 dependencies. + /// Returns the given collection's dependencies. fn determine_collection_dependencies( &self, self_collections: &BTreeMap>, source_id: GlobalId, - data_source: &DataSource, + collection_desc: &CollectionDescription, ) -> Result, StorageError> { - let dependencies = match &data_source { + let mut dependencies = Vec::new(); + + if let Some(id) = collection_desc.primary { + dependencies.push(id); + } + + match &collection_desc.data_source { DataSource::Introspection(_) | DataSource::Webhook - | DataSource::Table { primary: None } + | DataSource::Table | DataSource::Progress - | DataSource::Other => Vec::new(), - DataSource::Table { - primary: Some(primary), - } => vec![*primary], + | DataSource::Other => (), DataSource::IngestionExport { ingestion_id, data_config, @@ -935,20 +936,18 @@ where }; match data_config.envelope { - SourceEnvelope::CdcV2 => Vec::new(), - _ => vec![ingestion.remap_collection_id], + SourceEnvelope::CdcV2 => (), + _ => dependencies.push(ingestion.remap_collection_id), } } // Ingestions depend on their remap collection. DataSource::Ingestion(ingestion) => { - if ingestion.remap_collection_id == source_id { - vec![] - } else { - vec![ingestion.remap_collection_id] + if ingestion.remap_collection_id != source_id { + dependencies.push(ingestion.remap_collection_id); } } - DataSource::Sink { desc } => vec![desc.sink.from], - }; + DataSource::Sink { desc } => dependencies.push(desc.sink.from), + } Ok(dependencies) } @@ -1342,12 +1341,9 @@ where let mut persist_compaction_commands = Vec::with_capacity(collections_net.len()); for (key, (mut changes, frontier)) in collections_net { if !changes.is_empty() { - // If the table has a "primary" collection, let that collection drive compaction. + // If the collection has a "primary" collection, let that primary drive compaction. let collection = collections.get(&key).expect("must still exist"); - let should_emit_persist_compaction = !matches!( - collection.description.data_source, - DataSource::Table { primary: Some(_) } - ); + let should_emit_persist_compaction = collection.description.primary.is_none(); if frontier.is_empty() { info!(id = %key, "removing collection state because the since advanced to []!"); @@ -1906,7 +1902,7 @@ where | DataSource::Progress | DataSource::Other => {} DataSource::Sink { .. } => {} - DataSource::Table { .. } => { + DataSource::Table => { let register_ts = register_ts.expect( "caller should have provided a register_ts when creating a table", ); @@ -1963,7 +1959,7 @@ where Sink(GlobalId), } to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source { - DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)), + DataSource::Table => DependencyOrder::Table(Reverse(*id)), DataSource::Sink { .. } => DependencyOrder::Sink(*id), _ => DependencyOrder::Collection(*id), }); @@ -1977,11 +1973,8 @@ where let data_shard_since = since_handle.since().clone(); // Determine if this collection has any dependencies. - let storage_dependencies = self.determine_collection_dependencies( - &*self_collections, - id, - &description.data_source, - )?; + let storage_dependencies = + self.determine_collection_dependencies(&*self_collections, id, &description)?; // Determine the initial since of the collection. let initial_since = match storage_dependencies @@ -2088,7 +2081,7 @@ where self_collections.insert(id, collection_state); } - DataSource::Table { .. } => { + DataSource::Table => { // See comment on self.initial_txn_upper on why we're doing // this. if is_in_txns(id, &metadata) @@ -2265,7 +2258,7 @@ where .ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?; // TODO(alter_table): Support changes to sources. - if !matches!(&existing.description.data_source, DataSource::Table { .. }) { + if existing.description.data_source != DataSource::Table { return Err(StorageError::IdentifierInvalid(existing_collection)); } @@ -2352,15 +2345,11 @@ where .expect("existing collection missing"); // A higher level should already be asserting this, but let's make sure. - assert!(matches!( - existing.description.data_source, - DataSource::Table { primary: None } - )); + assert_eq!(existing.description.data_source, DataSource::Table); + assert_none!(existing.description.primary); // The existing version of the table will depend on the new version. - existing.description.data_source = DataSource::Table { - primary: Some(new_collection), - }; + existing.description.primary = Some(new_collection); existing.storage_dependencies.push(new_collection); // Copy over the frontiers from the previous version. @@ -2378,8 +2367,8 @@ where let mut changes = ChangeBatch::new(); changes.extend(implied_capability.iter().map(|t| (t.clone(), 1))); - // Note: The new collection is now the "primary collection" so we specify `None` here. - let collection_desc = CollectionDescription::for_table(new_desc.clone(), None); + // Note: The new collection is now the "primary collection". + let collection_desc = CollectionDescription::for_table(new_desc.clone()); let collection_meta = CollectionMetadata { persist_location: self.persist_location.clone(), relation_desc: collection_desc.desc.clone(), diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs index 0776d4cb4ee31..5c736ed2d36f7 100644 --- a/src/storage-controller/src/lib.rs +++ b/src/storage-controller/src/lib.rs @@ -890,7 +890,7 @@ where // easier to reason about it this way. let (tables_to_register, collections_to_register): (Vec<_>, Vec<_>) = to_register .into_iter() - .partition(|(_id, desc, ..)| matches!(desc.data_source, DataSource::Table { .. })); + .partition(|(_id, desc, ..)| desc.data_source == DataSource::Table); let to_register = tables_to_register .into_iter() .rev() @@ -909,15 +909,13 @@ where && !(self.read_only && migrated_storage_collections.contains(&id)) }; - let data_source = description.data_source; - to_execute.insert(id); new_collections.insert(id); let write_frontier = write.upper(); // Determine if this collection has another dependency. - let storage_dependencies = self.determine_collection_dependencies(id, &data_source)?; + let storage_dependencies = self.determine_collection_dependencies(id, &description)?; let dependency_read_holds = self .storage_collections @@ -929,6 +927,8 @@ where dependency_since.join_assign(read_hold.since()); } + let data_source = description.data_source; + // Assert some invariants. // // TODO(alter_table): Include Tables (is_in_txns) in this check. After @@ -1081,7 +1081,7 @@ where new_source_statistic_entries.insert(id); } - DataSource::Table { .. } => { + DataSource::Table => { debug!( ?data_source, meta = ?metadata, "registering {id} with persist table worker", @@ -1212,7 +1212,7 @@ where ), DataSource::Introspection(_) | DataSource::Webhook - | DataSource::Table { .. } + | DataSource::Table | DataSource::Progress | DataSource::Other => {} DataSource::Sink { .. } => { @@ -1388,7 +1388,7 @@ where let existing = collections .get(&existing_collection) .ok_or(StorageError::IdentifierMissing(existing_collection))?; - if !matches!(existing.data_source, DataSource::Table { .. }) { + if existing.data_source != DataSource::Table { return Err(StorageError::IdentifierInvalid(existing_collection)); } @@ -1419,8 +1419,6 @@ where ) .await; - // Note: The new collection is now the "primary collection" so we specify `None` here. - let collection_desc = CollectionDescription::::for_table(new_desc.clone(), None); let collection_meta = CollectionMetadata { persist_location: self.persist_location.clone(), data_shard, @@ -1431,7 +1429,7 @@ where // TODO(alter_table): Support schema evolution on sources. let wallclock_lag_metrics = self.metrics.wallclock_lag_metrics(new_collection, None); let collection_state = CollectionState::new( - collection_desc.data_source.clone(), + DataSource::Table, collection_meta, CollectionStateExtra::None, wallclock_lag_metrics, @@ -1440,17 +1438,6 @@ where // Great! We have successfully evolved the schema of our Table, now we need to update our // in-memory data structures. self.collections.insert(new_collection, collection_state); - let existing = self - .collections - .get_mut(&existing_collection) - .expect("missing existing collection"); - assert!(matches!( - existing.data_source, - DataSource::Table { primary: None } - )); - existing.data_source = DataSource::Table { - primary: Some(new_collection), - }; self.persist_table_worker .register(register_ts, vec![(new_collection, write_handle)]) @@ -1766,7 +1753,7 @@ where let (table_write_ids, data_source_ids): (Vec<_>, Vec<_>) = identifiers .into_iter() .partition(|id| match self.collections[id].data_source { - DataSource::Table { .. } => true, + DataSource::Table => true, DataSource::IngestionExport { .. } | DataSource::Webhook => false, _ => panic!("identifier is not a table: {}", id), }); @@ -1884,7 +1871,7 @@ where ingestions_to_drop.insert(*id); source_statistics_to_drop.push(*id); } - DataSource::Progress | DataSource::Table { .. } | DataSource::Other => { + DataSource::Progress | DataSource::Table | DataSource::Other => { collections_to_drop.push(*id); } DataSource::Introspection(_) | DataSource::Sink { .. } => { @@ -3214,17 +3201,20 @@ where fn determine_collection_dependencies( &self, self_id: GlobalId, - data_source: &DataSource, + collection_desc: &CollectionDescription, ) -> Result, StorageError> { - let dependency = match &data_source { + let mut dependencies = Vec::new(); + + if let Some(id) = collection_desc.primary { + dependencies.push(id); + } + + match &collection_desc.data_source { DataSource::Introspection(_) | DataSource::Webhook - | DataSource::Table { primary: None } + | DataSource::Table | DataSource::Progress - | DataSource::Other => vec![], - DataSource::Table { - primary: Some(primary), - } => vec![*primary], + | DataSource::Other => (), DataSource::IngestionExport { ingestion_id, .. } => { // Ingestion exports depend on their primary source's remap // collection. @@ -3241,7 +3231,7 @@ where // and, 2) that the remap shard's since stays one step behind // their upper. Hence they track themselves and the remap shard // as dependencies. - vec![self_id, ingestion_remap_collection_id] + dependencies.extend([self_id, ingestion_remap_collection_id]); } // Ingestions depend on their remap collection. DataSource::Ingestion(ingestion) => { @@ -3249,19 +3239,18 @@ where // since stays one step behind the upper, and, 2) that the remap // shard's since stays one step behind their upper. Hence they // track themselves and the remap shard as dependencies. - let mut dependencies = vec![self_id]; + dependencies.push(self_id); if self_id != ingestion.remap_collection_id { dependencies.push(ingestion.remap_collection_id); } - dependencies } DataSource::Sink { desc } => { // Sinks hold back their own frontier and the frontier of their input. - vec![self_id, desc.sink.from] + dependencies.extend([self_id, desc.sink.from]); } }; - Ok(dependency) + Ok(dependencies) } async fn read_handle_for_snapshot(