Skip to content

Commit 2a1883f

Browse files
committed
storage: support "view" collections for other data sources
This commit changes the storage controller's data structures to support specifying a "view" storage collection, i.e., a storage collection that points to the same persist shard as another, for other data sources than `Table`. The intent is specifically to get support for `DataSource::Other`, in preparation for `ALTER MATERIALIZED VIEW`. Rather than adding a `primary` field to the `DataSource::Other` variant, this commit instead moves the `primary` field out of `DataSource::Table` and into `CollectionDescription`. This reflects the fact that the "primary" concept is independent of the data source, even though it's currently only used for tables. It also removes some visual clutter.
1 parent b6ee316 commit 2a1883f

File tree

8 files changed

+84
-114
lines changed

8 files changed

+84
-114
lines changed

src/adapter/src/coord.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2736,6 +2736,7 @@ impl Coordinator {
27362736
since: None,
27372737
status_collection_id,
27382738
timeline: Some(timeline.clone()),
2739+
primary: None,
27392740
}
27402741
};
27412742

@@ -2766,10 +2767,9 @@ impl Coordinator {
27662767
let next_version = version.bump();
27672768
let primary_collection =
27682769
versions.get(&next_version).map(|(gid, _desc)| gid).copied();
2769-
let collection_desc = CollectionDescription::for_table(
2770-
desc.clone(),
2771-
primary_collection,
2772-
);
2770+
let mut collection_desc =
2771+
CollectionDescription::for_table(desc.clone());
2772+
collection_desc.primary = primary_collection;
27732773

27742774
(*gid, collection_desc)
27752775
});
@@ -2808,13 +2808,8 @@ impl Coordinator {
28082808
compute_collections.push((mv.global_id_writes(), mv.desc.latest()));
28092809
}
28102810
CatalogItem::ContinualTask(ct) => {
2811-
let collection_desc = CollectionDescription {
2812-
desc: ct.desc.clone(),
2813-
data_source: DataSource::Other,
2814-
since: ct.initial_as_of.clone(),
2815-
status_collection_id: None,
2816-
timeline: None,
2817-
};
2811+
let collection_desc =
2812+
CollectionDescription::for_other(ct.desc.clone(), ct.initial_as_of.clone());
28182813
if ct.global_id().is_system() && collection_desc.since.is_none() {
28192814
// We need a non-0 since to make as_of selection work. Fill it in below with
28202815
// the `bootstrap_builtin_continual_tasks` call, which can only be run after
@@ -2859,6 +2854,7 @@ impl Coordinator {
28592854
since: None,
28602855
status_collection_id: None,
28612856
timeline: None,
2857+
primary: None,
28622858
};
28632859
collections.push((sink.global_id, collection_desc));
28642860
}

src/adapter/src/coord/ddl.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,7 @@ impl Coordinator {
13081308
since: None,
13091309
status_collection_id: None,
13101310
timeline: None,
1311+
primary: None,
13111312
};
13121313
let collections = vec![(id, collection_desc)];
13131314

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,7 @@ impl Coordinator {
772772
timeline: Some(source.timeline),
773773
since: None,
774774
status_collection_id,
775+
primary: None,
775776
},
776777
));
777778
}
@@ -1229,8 +1230,7 @@ impl Coordinator {
12291230
.desc
12301231
.at_version(RelationVersionSelector::Specific(relation_version));
12311232
// We assert above we have a single version, and thus we are the primary.
1232-
let collection_desc =
1233-
CollectionDescription::for_table(relation_desc, None);
1233+
let collection_desc = CollectionDescription::for_table(relation_desc);
12341234
let collections = vec![(global_id, collection_desc)];
12351235

12361236
let compaction_window = table
@@ -1283,6 +1283,7 @@ impl Coordinator {
12831283
since: None,
12841284
status_collection_id,
12851285
timeline: Some(timeline.clone()),
1286+
primary: None,
12861287
};
12871288

12881289
let collections = vec![(global_id, collection_desc)];
@@ -1319,6 +1320,7 @@ impl Coordinator {
13191320
since: None,
13201321
status_collection_id: None,
13211322
timeline: Some(timeline.clone()),
1323+
primary: None,
13221324
};
13231325
let collections = vec![(global_id, collection_desc)];
13241326
let read_policies = coord
@@ -4492,6 +4494,7 @@ impl Coordinator {
44924494
since: None,
44934495
status_collection_id,
44944496
timeline: Some(source.timeline.clone()),
4497+
primary: None,
44954498
},
44964499
));
44974500

src/adapter/src/coord/sequencer/inner/create_continual_task.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use mz_sql::plan;
3333
use mz_sql::session::metadata::SessionMetadata;
3434
use mz_sql_parser::ast::Statement;
3535
use mz_sql_parser::ast::display::AstDisplay;
36-
use mz_storage_client::controller::{CollectionDescription, DataSource};
36+
use mz_storage_client::controller::CollectionDescription;
3737
use mz_transform::dataflow::DataflowMetainfo;
3838
use mz_transform::notice::OptimizerNotice;
3939

@@ -148,13 +148,7 @@ impl Coordinator {
148148
None,
149149
vec![(
150150
global_id,
151-
CollectionDescription {
152-
desc,
153-
data_source: DataSource::Other,
154-
since: Some(as_of),
155-
status_collection_id: None,
156-
timeline: None,
157-
},
151+
CollectionDescription::for_other(desc, Some(as_of)),
158152
)],
159153
)
160154
.await

src/adapter/src/coord/sequencer/inner/create_materialized_view.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use mz_sql::plan;
2929
use mz_sql::session::metadata::SessionMetadata;
3030
use mz_sql_parser::ast;
3131
use mz_sql_parser::ast::display::AstDisplay;
32-
use mz_storage_client::controller::{CollectionDescription, DataSource};
32+
use mz_storage_client::controller::CollectionDescription;
3333
use std::collections::BTreeMap;
3434
use timely::progress::Antichain;
3535
use tracing::Span;
@@ -696,13 +696,7 @@ impl Coordinator {
696696
None,
697697
vec![(
698698
global_id,
699-
CollectionDescription {
700-
desc: output_desc,
701-
data_source: DataSource::Other,
702-
since: Some(storage_as_of),
703-
status_collection_id: None,
704-
timeline: None,
705-
},
699+
CollectionDescription::for_other(output_desc, Some(storage_as_of)),
706700
)],
707701
)
708702
.await

src/storage-client/src/controller.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,7 @@ pub enum DataSource<T> {
124124
/// Data comes from external HTTP requests pushed to Materialize.
125125
Webhook,
126126
/// The adapter layer appends timestamped data, i.e. it is a `TABLE`.
127-
Table {
128-
/// This table has had columns added or dropped to it, so we're now a
129-
/// "view" over the "primary" Table/collection. Within the
130-
/// `storage-controller` we the primary as a dependency.
131-
primary: Option<GlobalId>,
132-
},
127+
Table,
133128
/// This source's data does not need to be managed by the storage
134129
/// controller, e.g. it's a materialized view or the catalog collection.
135130
Other,
@@ -151,6 +146,13 @@ pub struct CollectionDescription<T> {
151146
pub status_collection_id: Option<GlobalId>,
152147
/// The timeline of the source. Absent for materialized views, continual tasks, etc.
153148
pub timeline: Option<Timeline>,
149+
/// The primary of this collections.
150+
///
151+
/// Multiple storage collections can point to the same persist shard,
152+
/// possibly with different schemas. In such a configuration, we select one
153+
/// of the involved collections as the primary, who "owns" the persist
154+
/// shard. All other involved collections have a dependency on the primary.
155+
pub primary: Option<GlobalId>,
154156
}
155157

156158
impl<T> CollectionDescription<T> {
@@ -162,17 +164,19 @@ impl<T> CollectionDescription<T> {
162164
since,
163165
status_collection_id: None,
164166
timeline: None,
167+
primary: None,
165168
}
166169
}
167170

168171
/// Create a CollectionDescription for a table.
169-
pub fn for_table(desc: RelationDesc, primary: Option<GlobalId>) -> Self {
172+
pub fn for_table(desc: RelationDesc) -> Self {
170173
Self {
171174
desc,
172-
data_source: DataSource::Table { primary },
175+
data_source: DataSource::Table,
173176
since: None,
174177
status_collection_id: None,
175178
timeline: Some(Timeline::EpochMilliseconds),
179+
primary: None,
176180
}
177181
}
178182
}
@@ -736,7 +740,7 @@ impl<T> DataSource<T> {
736740
/// source using txn-wal.
737741
pub fn in_txns(&self) -> bool {
738742
match self {
739-
DataSource::Table { .. } => true,
743+
DataSource::Table => true,
740744
DataSource::Other
741745
| DataSource::Ingestion(_)
742746
| DataSource::IngestionExport { .. }

src/storage-client/src/storage_collections.rs

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -902,24 +902,25 @@ where
902902
Ok(())
903903
}
904904

905-
/// Determine if this collection has another dependency.
906-
///
907-
/// Currently, collections have either 0 or 1 dependencies.
905+
/// Returns the given collection's dependencies.
908906
fn determine_collection_dependencies(
909907
&self,
910908
self_collections: &BTreeMap<GlobalId, CollectionState<T>>,
911909
source_id: GlobalId,
912-
data_source: &DataSource<T>,
910+
collection_desc: &CollectionDescription<T>,
913911
) -> Result<Vec<GlobalId>, StorageError<T>> {
914-
let dependencies = match &data_source {
912+
let mut dependencies = Vec::new();
913+
914+
if let Some(id) = collection_desc.primary {
915+
dependencies.push(id);
916+
}
917+
918+
match &collection_desc.data_source {
915919
DataSource::Introspection(_)
916920
| DataSource::Webhook
917-
| DataSource::Table { primary: None }
921+
| DataSource::Table
918922
| DataSource::Progress
919-
| DataSource::Other => Vec::new(),
920-
DataSource::Table {
921-
primary: Some(primary),
922-
} => vec![*primary],
923+
| DataSource::Other => (),
923924
DataSource::IngestionExport {
924925
ingestion_id,
925926
data_config,
@@ -935,20 +936,18 @@ where
935936
};
936937

937938
match data_config.envelope {
938-
SourceEnvelope::CdcV2 => Vec::new(),
939-
_ => vec![ingestion.remap_collection_id],
939+
SourceEnvelope::CdcV2 => (),
940+
_ => dependencies.push(ingestion.remap_collection_id),
940941
}
941942
}
942943
// Ingestions depend on their remap collection.
943944
DataSource::Ingestion(ingestion) => {
944-
if ingestion.remap_collection_id == source_id {
945-
vec![]
946-
} else {
947-
vec![ingestion.remap_collection_id]
945+
if ingestion.remap_collection_id != source_id {
946+
dependencies.push(ingestion.remap_collection_id);
948947
}
949948
}
950-
DataSource::Sink { desc } => vec![desc.sink.from],
951-
};
949+
DataSource::Sink { desc } => dependencies.push(desc.sink.from),
950+
}
952951

953952
Ok(dependencies)
954953
}
@@ -1342,12 +1341,9 @@ where
13421341
let mut persist_compaction_commands = Vec::with_capacity(collections_net.len());
13431342
for (key, (mut changes, frontier)) in collections_net {
13441343
if !changes.is_empty() {
1345-
// If the table has a "primary" collection, let that collection drive compaction.
1344+
// If the collection has a "primary" collection, let that primary drive compaction.
13461345
let collection = collections.get(&key).expect("must still exist");
1347-
let should_emit_persist_compaction = !matches!(
1348-
collection.description.data_source,
1349-
DataSource::Table { primary: Some(_) }
1350-
);
1346+
let should_emit_persist_compaction = collection.description.primary.is_some();
13511347

13521348
if frontier.is_empty() {
13531349
info!(id = %key, "removing collection state because the since advanced to []!");
@@ -1906,7 +1902,7 @@ where
19061902
| DataSource::Progress
19071903
| DataSource::Other => {}
19081904
DataSource::Sink { .. } => {}
1909-
DataSource::Table { .. } => {
1905+
DataSource::Table => {
19101906
let register_ts = register_ts.expect(
19111907
"caller should have provided a register_ts when creating a table",
19121908
);
@@ -1963,7 +1959,7 @@ where
19631959
Sink(GlobalId),
19641960
}
19651961
to_register.sort_by_key(|(id, desc, ..)| match &desc.data_source {
1966-
DataSource::Table { .. } => DependencyOrder::Table(Reverse(*id)),
1962+
DataSource::Table => DependencyOrder::Table(Reverse(*id)),
19671963
DataSource::Sink { .. } => DependencyOrder::Sink(*id),
19681964
_ => DependencyOrder::Collection(*id),
19691965
});
@@ -1977,11 +1973,8 @@ where
19771973
let data_shard_since = since_handle.since().clone();
19781974

19791975
// Determine if this collection has any dependencies.
1980-
let storage_dependencies = self.determine_collection_dependencies(
1981-
&*self_collections,
1982-
id,
1983-
&description.data_source,
1984-
)?;
1976+
let storage_dependencies =
1977+
self.determine_collection_dependencies(&*self_collections, id, &description)?;
19851978

19861979
// Determine the initial since of the collection.
19871980
let initial_since = match storage_dependencies
@@ -2088,7 +2081,7 @@ where
20882081

20892082
self_collections.insert(id, collection_state);
20902083
}
2091-
DataSource::Table { .. } => {
2084+
DataSource::Table => {
20922085
// See comment on self.initial_txn_upper on why we're doing
20932086
// this.
20942087
if is_in_txns(id, &metadata)
@@ -2265,7 +2258,7 @@ where
22652258
.ok_or_else(|| StorageError::IdentifierMissing(existing_collection))?;
22662259

22672260
// TODO(alter_table): Support changes to sources.
2268-
if !matches!(&existing.description.data_source, DataSource::Table { .. }) {
2261+
if existing.description.data_source != DataSource::Table {
22692262
return Err(StorageError::IdentifierInvalid(existing_collection));
22702263
}
22712264

@@ -2352,15 +2345,11 @@ where
23522345
.expect("existing collection missing");
23532346

23542347
// A higher level should already be asserting this, but let's make sure.
2355-
assert!(matches!(
2356-
existing.description.data_source,
2357-
DataSource::Table { primary: None }
2358-
));
2348+
assert_eq!(existing.description.data_source, DataSource::Table);
2349+
assert_none!(existing.description.primary);
23592350

23602351
// The existing version of the table will depend on the new version.
2361-
existing.description.data_source = DataSource::Table {
2362-
primary: Some(new_collection),
2363-
};
2352+
existing.description.primary = Some(new_collection);
23642353
existing.storage_dependencies.push(new_collection);
23652354

23662355
// Copy over the frontiers from the previous version.
@@ -2378,8 +2367,8 @@ where
23782367
let mut changes = ChangeBatch::new();
23792368
changes.extend(implied_capability.iter().map(|t| (t.clone(), 1)));
23802369

2381-
// Note: The new collection is now the "primary collection" so we specify `None` here.
2382-
let collection_desc = CollectionDescription::for_table(new_desc.clone(), None);
2370+
// Note: The new collection is now the "primary collection".
2371+
let collection_desc = CollectionDescription::for_table(new_desc.clone());
23832372
let collection_meta = CollectionMetadata {
23842373
persist_location: self.persist_location.clone(),
23852374
relation_desc: collection_desc.desc.clone(),

0 commit comments

Comments
 (0)