Skip to content

Commit

Permalink
Introduce the Source References internal table, durably stored in the…
Browse files Browse the repository at this point in the history
… catalog
  • Loading branch information
rjobanp committed Aug 27, 2024
1 parent 8041e66 commit a055e35
Show file tree
Hide file tree
Showing 31 changed files with 1,384 additions and 20 deletions.
2 changes: 2 additions & 0 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,8 @@ The `mz_source_statistics` view contains statistics about each source.

<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_source_statistics_with_history -->

<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_source_references -->

### Counters
`messages_received`, `messages_staged`, `updates_staged`, and `updates_committed` are all counters that monotonically increase. They are _only
useful for calculating rates_, to understand the general performance of your source.
Expand Down
24 changes: 24 additions & 0 deletions src/adapter/src/catalog/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ impl CatalogState {
StateUpdateKind::Comment(comment) => {
self.apply_comment_update(comment, diff, retractions);
}
StateUpdateKind::SourceReferences(source_reference) => {
self.apply_source_references_update(source_reference, diff, retractions);
}
StateUpdateKind::AuditLog(_audit_log) => {
// Audit logs are not stored in-memory.
}
Expand Down Expand Up @@ -935,6 +938,23 @@ impl CatalogState {
}
}

#[instrument(level = "debug")]
fn apply_source_references_update(
&mut self,
_source_references: mz_catalog::durable::SourceReferences,
diff: StateDiff,
_retractions: &mut InProgressRetractions,
) {
match diff {
StateDiff::Addition => {
unimplemented!("source references are not yet implemented");
}
StateDiff::Retraction => {
unimplemented!("source references are not yet implemented");
}
}
}

#[instrument(level = "debug")]
fn apply_storage_collection_metadata_update(
&mut self,
Expand Down Expand Up @@ -1061,6 +1081,9 @@ impl CatalogState {
&comment.comment,
diff,
)],
StateUpdateKind::SourceReferences(source_references) => {
self.pack_source_references_update(&source_references, diff)
}
StateUpdateKind::AuditLog(audit_log) => {
vec![self
.pack_audit_log_update(&audit_log.event, diff)
Expand Down Expand Up @@ -1562,6 +1585,7 @@ fn sort_updates_inner(updates: Vec<StateUpdate>) -> Vec<StateUpdate> {
&mut item_additions,
),
StateUpdateKind::Comment(_)
| StateUpdateKind::SourceReferences(_)
| StateUpdateKind::AuditLog(_)
| StateUpdateKind::StorageCollectionMetadata(_)
| StateUpdateKind::UnfinalizedShard(_) => push_update(
Expand Down
49 changes: 46 additions & 3 deletions src/adapter/src/catalog/builtin_table_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ use mz_catalog::builtin::{
MZ_MATERIALIZED_VIEWS, MZ_MATERIALIZED_VIEW_REFRESH_STRATEGIES, MZ_MYSQL_SOURCE_TABLES,
MZ_OBJECT_DEPENDENCIES, MZ_OPERATORS, MZ_PENDING_CLUSTER_REPLICAS, MZ_POSTGRES_SOURCES,
MZ_POSTGRES_SOURCE_TABLES, MZ_PSEUDO_TYPES, MZ_ROLES, MZ_ROLE_MEMBERS, MZ_ROLE_PARAMETERS,
MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCES, MZ_SSH_TUNNEL_CONNECTIONS,
MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES, MZ_TABLES, MZ_TYPES,
MZ_TYPE_PG_METADATA, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
MZ_SCHEMAS, MZ_SECRETS, MZ_SESSIONS, MZ_SINKS, MZ_SOURCES, MZ_SOURCE_REFERENCES,
MZ_SSH_TUNNEL_CONNECTIONS, MZ_STORAGE_USAGE_BY_SHARD, MZ_SUBSCRIPTIONS, MZ_SYSTEM_PRIVILEGES,
MZ_TABLES, MZ_TYPES, MZ_TYPE_PG_METADATA, MZ_VIEWS, MZ_WEBHOOKS_SOURCES,
};
use mz_catalog::config::AwsPrincipalContext;
use mz_catalog::durable::SourceReferences;
use mz_catalog::memory::error::{Error, ErrorKind};
use mz_catalog::memory::objects::{
CatalogItem, ClusterReplicaProcessStatus, ClusterVariant, Connection, DataSourceDesc, Func,
Expand Down Expand Up @@ -1993,4 +1994,46 @@ impl CatalogState {
diff,
}
}

pub fn pack_source_references_update(
&self,
source_references: &SourceReferences,
diff: Diff,
) -> Vec<BuiltinTableUpdate<&'static BuiltinTable>> {
let source_id = source_references.source_id.to_string();
let updated_at = &source_references.updated_at;
source_references
.references
.iter()
.map(|reference| {
let mut row = Row::default();
let mut packer = row.packer();
packer.extend([
Datum::String(&source_id),
reference
.namespace
.as_ref()
.map(|s| Datum::String(s))
.unwrap_or(Datum::Null),
Datum::String(&reference.name),
Datum::TimestampTz(
mz_ore::now::to_datetime(*updated_at)
.try_into()
.expect("must fit"),
),
]);
if reference.columns.len() > 0 {
packer.push_list(reference.columns.iter().map(|col| Datum::String(col)));
} else {
packer.push(Datum::Null);
}

BuiltinTableUpdate {
id: &*MZ_SOURCE_REFERENCES,
row,
diff,
}
})
.collect()
}
}
1 change: 1 addition & 0 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ impl Catalog {
BootstrapStateUpdateKind::Comment(_)
| BootstrapStateUpdateKind::AuditLog(_)
| BootstrapStateUpdateKind::StorageCollectionMetadata(_)
| BootstrapStateUpdateKind::SourceReferences(_)
| BootstrapStateUpdateKind::UnfinalizedShard(_) => {
post_item_updates.push(StateUpdate {
kind: kind.into(),
Expand Down
2 changes: 2 additions & 0 deletions src/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ breaking:
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v62.proto
# reason: does currently not require backward-compatibility
- catalog/protos/objects_v63.proto
# reason: does currently not require backward-compatibility
- cluster-client/src/client.proto
# reason: does currently not require backward-compatibility
- compute-client/src/logging.proto
Expand Down
11 changes: 10 additions & 1 deletion src/catalog-debug/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use mz_catalog::durable::debug::{
ClusterReplicaCollection, Collection, CollectionTrace, CollectionType, CommentCollection,
ConfigCollection, DatabaseCollection, DebugCatalogState, DefaultPrivilegeCollection,
IdAllocatorCollection, ItemCollection, RoleCollection, SchemaCollection, SettingCollection,
StorageCollectionMetadataCollection, SystemConfigurationCollection,
SourceReferencesCollection, StorageCollectionMetadataCollection, SystemConfigurationCollection,
SystemItemMappingCollection, SystemPrivilegeCollection, Trace, TxnWalShardCollection,
UnfinalizedShardsCollection,
};
Expand Down Expand Up @@ -257,6 +257,7 @@ macro_rules! for_collection {
CollectionType::Role => $fn::<RoleCollection>($($arg),*).await?,
CollectionType::Schema => $fn::<SchemaCollection>($($arg),*).await?,
CollectionType::Setting => $fn::<SettingCollection>($($arg),*).await?,
CollectionType::SourceReferences => $fn::<SourceReferencesCollection>($($arg),*).await?,
CollectionType::SystemConfiguration => $fn::<SystemConfigurationCollection>($($arg),*).await?,
CollectionType::SystemGidMapping => $fn::<SystemItemMappingCollection>($($arg),*).await?,
CollectionType::SystemPrivileges => $fn::<SystemPrivilegeCollection>($($arg),*).await?,
Expand Down Expand Up @@ -392,6 +393,7 @@ async fn dump(
roles,
schemas,
settings,
source_references,
system_object_mappings,
system_configurations,
system_privileges,
Expand Down Expand Up @@ -437,6 +439,13 @@ async fn dump(
dump_col(&mut data, roles, &ignore, stats_only, consolidate);
dump_col(&mut data, schemas, &ignore, stats_only, consolidate);
dump_col(&mut data, settings, &ignore, stats_only, consolidate);
dump_col(
&mut data,
source_references,
&ignore,
stats_only,
consolidate,
);
dump_col(
&mut data,
system_configurations,
Expand Down
6 changes: 5 additions & 1 deletion src/catalog/protos/hashes.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"name": "objects.proto",
"md5": "e3f60c6fc3338952772fdb5f1778afee"
"md5": "5fb24f4ff7e8c4d96910a52cfb6cce4c"
},
{
"name": "objects_v60.proto",
Expand All @@ -14,5 +14,9 @@
{
"name": "objects_v62.proto",
"md5": "287d592a5e3f2e4d0f06663880a10031"
},
{
"name": "objects_v63.proto",
"md5": "5539fa20fcf47ac0e9ca9053427ebdde"
}
]
21 changes: 21 additions & 0 deletions src/catalog/protos/objects.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,21 @@ message CommentValue {
string comment = 1;
}

message SourceReferencesKey {
GlobalId source = 1;
}

message SourceReferencesValue {
repeated SourceReference references = 1;
EpochMillis updated_at = 2;
}

message SourceReference {
string name = 1;
optional string namespace = 2;
repeated string columns = 3;
}

message StorageCollectionMetadataKey {
GlobalId id = 1;
}
Expand Down Expand Up @@ -838,6 +853,11 @@ message StateUpdateKind {
ServerConfigurationValue value = 2;
}

message SourceReferences {
SourceReferencesKey key = 1;
SourceReferencesValue value = 2;
}

message GidMapping {
GidMappingKey key = 1;
GidMappingValue value = 2;
Expand Down Expand Up @@ -889,5 +909,6 @@ message StateUpdateKind {
StorageCollectionMetadata storage_collection_metadata = 20;
UnfinalizedShard unfinalized_shard = 21;
TxnWalShard txn_wal_shard = 23;
SourceReferences source_references = 24;
}
}
Loading

0 comments on commit a055e35

Please sign in to comment.