From 35425794e76a63a1d59c145fcaa806258fa7b284 Mon Sep 17 00:00:00 2001 From: Jackson Newhouse Date: Tue, 10 Dec 2024 17:56:17 -0800 Subject: [PATCH] refactor(catalog): Rework how CatalogOps update the DatabaseSchema --- influxdb3_catalog/src/catalog.rs | 517 +++++++++++++++---------------- 1 file changed, 258 insertions(+), 259 deletions(-) diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index c62f57cd9ca..46ddf099d16 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -6,8 +6,8 @@ use hashbrown::HashMap; use indexmap::IndexMap; use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId}; use influxdb3_wal::{ - CatalogBatch, CatalogOp, DeleteTableDefinition, FieldAdditions, LastCacheDefinition, - LastCacheDelete, MetaCacheDefinition, MetaCacheDelete, + CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition, FieldAdditions, + FieldDefinition, LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete, }; use influxdb_line_protocol::FieldValue; use iox_time::Time; @@ -15,6 +15,7 @@ use observability_deps::tracing::{debug, info}; use parking_lot::RwLock; use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder}; use serde::{Deserialize, Serialize, Serializer}; +use std::borrow::Cow; use std::collections::BTreeMap; use std::sync::Arc; use thiserror::Error; @@ -502,147 +503,17 @@ impl DatabaseSchema { catalog_batch: &CatalogBatch, ) -> Result> { debug!(name = ?db_schema.name, deleted = ?db_schema.deleted, full_batch = ?catalog_batch, "Updating / adding to catalog"); - let mut updated_or_new_tables = SerdeVecMap::new(); - let mut schema_deleted = false; - let mut table_deleted = false; - let mut deleted_table_defn = None; - let mut schema_name = Arc::clone(&db_schema.name); + + let mut schema = Cow::Borrowed(db_schema); for catalog_op in &catalog_batch.ops { - match catalog_op { - CatalogOp::CreateDatabase(_) => (), - CatalogOp::CreateTable(table_definition) => { - let new_or_existing_table = updated_or_new_tables - .get(&table_definition.table_id) - .or_else(|| db_schema.tables.get(&table_definition.table_id)); - if let Some(existing_table) = new_or_existing_table { - if let Some(new_table) = - existing_table.new_if_definition_adds_new_fields(table_definition)? - { - updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table)); - } - } else { - let new_table = TableDefinition::new_from_op(table_definition); - updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table)); - } - } - CatalogOp::AddFields(field_additions) => { - let Some(new_or_existing_table) = updated_or_new_tables - .get(&field_additions.table_id) - .or_else(|| db_schema.tables.get(&field_additions.table_id)) - else { - return Err(Error::TableNotFound { - db_name: Arc::clone(&field_additions.database_name), - table_name: Arc::clone(&field_additions.table_name), - }); - }; - if let Some(new_table) = - new_or_existing_table.new_if_field_additions_add_fields(field_additions)? - { - updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table)); - } - } - CatalogOp::CreateMetaCache(meta_cache_definition) => { - let table = updated_or_new_tables - .get(&meta_cache_definition.table_id) - .cloned() - .or_else(|| { - db_schema.table_definition_by_id(&meta_cache_definition.table_id) - }) - .ok_or(TableNotFound { - db_name: Arc::clone(&db_schema.name), - table_name: Arc::clone(&meta_cache_definition.table_name), - })?; - if let Some(new_table) = - table.new_if_meta_cache_definition_is_new(meta_cache_definition) - { - updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table)); - } - } - CatalogOp::DeleteMetaCache(delete_meta_cache) => { - let table = updated_or_new_tables - .get(&delete_meta_cache.table_id) - .cloned() - .or_else(|| db_schema.table_definition_by_id(&delete_meta_cache.table_id)) - .ok_or(TableNotFound { - db_name: Arc::clone(&db_schema.name), - table_name: Arc::clone(&delete_meta_cache.table_name), - })?; - if let Some(new_table) = - table.new_if_meta_cache_deletes_existing(delete_meta_cache) - { - updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table)); - } - } - CatalogOp::CreateLastCache(last_cache_definition) => { - let new_or_existing_table = updated_or_new_tables - .get(&last_cache_definition.table_id) - .or_else(|| db_schema.tables.get(&last_cache_definition.table_id)); - - let table = new_or_existing_table.ok_or(TableNotFound { - db_name: Arc::clone(&db_schema.name), - table_name: Arc::clone(&last_cache_definition.table), - })?; - - if let Some(new_table) = - table.new_if_last_cache_definition_is_new(last_cache_definition) - { - updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table)); - } - } - CatalogOp::DeleteLastCache(last_cache_deletion) => { - let new_or_existing_table = updated_or_new_tables - .get(&last_cache_deletion.table_id) - .or_else(|| db_schema.tables.get(&last_cache_deletion.table_id)); - - let table = new_or_existing_table.ok_or(TableNotFound { - db_name: Arc::clone(&db_schema.name), - table_name: Arc::clone(&last_cache_deletion.table_name), - })?; - - if let Some(new_table) = - table.new_if_last_cache_deletes_existing(last_cache_deletion) - { - updated_or_new_tables.insert(new_table.table_id, Arc::new(new_table)); - } - } - CatalogOp::DeleteDatabase(params) => { - schema_deleted = true; - let deletion_time = Time::from_timestamp_nanos(params.deletion_time); - schema_name = - make_new_name_using_deleted_time(¶ms.database_name, deletion_time); - } - CatalogOp::DeleteTable(table_definition) => { - table_deleted = true; - deleted_table_defn = Some(table_definition); - } - } + schema = catalog_op.update_schema(schema)?; } - - if updated_or_new_tables.is_empty() && !schema_deleted && !table_deleted { - Ok(None) + // If there were updates then it will have become owned, so we should return the new schema. + if let Cow::Owned(schema) = schema { + Ok(Some(schema)) } else { - for (table_id, table_def) in &db_schema.tables { - if !updated_or_new_tables.contains_key(table_id) { - updated_or_new_tables.insert(*table_id, Arc::clone(table_def)); - } - } - - check_and_mark_table_as_deleted(deleted_table_defn, &mut updated_or_new_tables); - - // With the final list of updated/new tables update the current mapping - let new_table_maps = updated_or_new_tables - .iter() - .map(|(table_id, table_def)| (*table_id, Arc::clone(&table_def.table_name))) - .collect(); - - Ok(Some(Self { - id: db_schema.id, - name: schema_name, - tables: updated_or_new_tables, - table_map: new_table_maps, - deleted: schema_deleted, - })) + Ok(None) } } @@ -743,19 +614,98 @@ impl DatabaseSchema { } } -fn check_and_mark_table_as_deleted( - deleted_table_defn: Option<&DeleteTableDefinition>, - updated_or_new_tables: &mut SerdeVecMap>, -) { - if let Some(deleted_table_defn) = deleted_table_defn { - if let Some(deleted_table) = updated_or_new_tables.get_mut(&deleted_table_defn.table_id) { - let deletion_time = Time::from_timestamp_nanos(deleted_table_defn.deletion_time); - let table_name = - make_new_name_using_deleted_time(&deleted_table_defn.table_name, deletion_time); +trait UpdateDatabaseSchema { + fn update_schema<'a>(&self, schema: Cow<'a, DatabaseSchema>) + -> Result>; +} + +impl UpdateDatabaseSchema for CatalogOp { + fn update_schema<'a>( + &self, + schema: Cow<'a, DatabaseSchema>, + ) -> Result> { + match &self { + CatalogOp::CreateDatabase(_) => Ok(schema), + CatalogOp::CreateTable(create_table) => create_table.update_schema(schema), + CatalogOp::AddFields(field_additions) => field_additions.update_schema(schema), + CatalogOp::CreateMetaCache(meta_cache_definition) => { + meta_cache_definition.update_schema(schema) + } + CatalogOp::DeleteMetaCache(delete_meta_cache) => { + delete_meta_cache.update_schema(schema) + } + CatalogOp::CreateLastCache(create_last_cache) => { + create_last_cache.update_schema(schema) + } + CatalogOp::DeleteLastCache(delete_last_cache) => { + delete_last_cache.update_schema(schema) + } + CatalogOp::DeleteDatabase(delete_database) => delete_database.update_schema(schema), + CatalogOp::DeleteTable(delete_table) => delete_table.update_schema(schema), + } + } +} + +impl UpdateDatabaseSchema for influxdb3_wal::TableDefinition { + fn update_schema<'a>( + &self, + mut database_schema: Cow<'a, DatabaseSchema>, + ) -> Result> { + match database_schema.tables.get(&self.table_id) { + Some(existing_table) => { + if let Cow::Owned(updated_table) = existing_table.check_and_add_new_fields(self)? { + database_schema + .to_mut() + .insert_table(self.table_id, Arc::new(updated_table)); + } + } + None => { + let new_table = TableDefinition::new_from_op(self); + database_schema + .to_mut() + .insert_table(new_table.table_id, Arc::new(new_table)); + } + } + Ok(database_schema) + } +} + +impl UpdateDatabaseSchema for DeleteDatabaseDefinition { + fn update_schema<'a>( + &self, + mut schema: Cow<'a, DatabaseSchema>, + ) -> Result> { + // TODO: check if we want to re-delete an already deleted DB. That is current behavior. + let deletion_time = Time::from_timestamp_nanos(self.deletion_time); + let owned = schema.to_mut(); + owned.name = make_new_name_using_deleted_time(&self.database_name, deletion_time); + owned.deleted = true; + Ok(schema) + } +} + +impl UpdateDatabaseSchema for DeleteTableDefinition { + fn update_schema<'a>( + &self, + mut schema: Cow<'a, DatabaseSchema>, + ) -> Result> { + // unlike other table ops, this is not an error. + if !schema.tables.contains_key(&self.table_id) { + return Ok(schema); + } + let mut_schema = schema.to_mut(); + if let Some(deleted_table) = mut_schema.tables.get_mut(&self.table_id) { + let deletion_time = Time::from_timestamp_nanos(self.deletion_time); + let table_name = make_new_name_using_deleted_time(&self.table_name, deletion_time); let new_table_def = Arc::make_mut(deleted_table); new_table_def.deleted = true; new_table_def.table_name = table_name; + mut_schema.table_map.insert( + new_table_def.table_id, + Arc::clone(&new_table_def.table_name), + ); } + Ok(schema) } } @@ -854,12 +804,10 @@ impl TableDefinition { .expect("tables defined from ops should not exceed column limits") } - /// Validates that the `influxdb3_wal::TableDefinition` is compatible with existing and returns a new - /// `TableDefinition` if new definition adds new fields. - pub(crate) fn new_if_definition_adds_new_fields( - &self, + pub(crate) fn check_and_add_new_fields<'a>( + &'a self, table_definition: &influxdb3_wal::TableDefinition, - ) -> Result> { + ) -> Result> { // validate the series key is the same if table_definition.key != self.series_key { return Err(Error::SeriesKeyMismatch { @@ -867,49 +815,20 @@ impl TableDefinition { existing: self.schema.series_key().unwrap_or_default().join("/"), }); } - let mut new_fields: Vec<(ColumnId, Arc, InfluxColumnType)> = - Vec::with_capacity(table_definition.field_definitions.len()); - - for field_def in &table_definition.field_definitions { - if let Some(existing_type) = self.columns.get(&field_def.id).map(|def| def.data_type) { - if existing_type != field_def.data_type.into() { - return Err(Error::FieldTypeMismatch { - table_name: self.table_name.to_string(), - column_name: field_def.name.to_string(), - existing: existing_type, - attempted: field_def.data_type.into(), - }); - } - } else { - new_fields.push(( - field_def.id, - Arc::clone(&field_def.name), - field_def.data_type.into(), - )); - } - } - - if new_fields.is_empty() { - Ok(None) - } else { - let mut new_table = self.clone(); - new_table.add_columns(new_fields)?; - Ok(Some(new_table)) - } + Self::add_fields(Cow::Borrowed(self), &table_definition.field_definitions) } - /// Validates that the `TableDefinition` is compatible with existing and returns a new - /// `TableDefinition` if new definition adds new fields. - pub(crate) fn new_if_field_additions_add_fields( - &self, - field_additions: &FieldAdditions, - ) -> Result> { - let mut new_fields = Vec::with_capacity(field_additions.field_definitions.len()); - for field_def in &field_additions.field_definitions { - if let Some(existing_type) = self.columns.get(&field_def.id).map(|def| def.data_type) { + pub(crate) fn add_fields<'a>( + mut table: Cow<'a, Self>, + fields: &Vec, + ) -> Result> { + let mut new_fields: Vec<(ColumnId, Arc, InfluxColumnType)> = + Vec::with_capacity(fields.len()); + for field_def in fields { + if let Some(existing_type) = table.columns.get(&field_def.id).map(|def| def.data_type) { if existing_type != field_def.data_type.into() { return Err(Error::FieldTypeMismatch { - table_name: self.table_name.to_string(), + table_name: table.table_name.to_string(), column_name: field_def.name.to_string(), existing: existing_type, attempted: field_def.data_type.into(), @@ -924,68 +843,10 @@ impl TableDefinition { } } - if new_fields.is_empty() { - Ok(None) - } else { - let mut new_table = self.clone(); - new_table.add_columns(new_fields)?; - Ok(Some(new_table)) - } - } - - pub(crate) fn new_if_meta_cache_definition_is_new( - &self, - meta_cache_definition: &MetaCacheDefinition, - ) -> Option { - if self - .meta_caches - .contains_key(&meta_cache_definition.cache_name) - { - None - } else { - let mut new_table = self.clone(); - new_table.add_meta_cache(meta_cache_definition.clone()); - Some(new_table) - } - } - - pub(crate) fn new_if_meta_cache_deletes_existing( - &self, - meta_cache_delete: &MetaCacheDelete, - ) -> Option { - if self.meta_caches.contains_key(&meta_cache_delete.cache_name) { - let mut new_table = self.clone(); - new_table.remove_meta_cache(&meta_cache_delete.cache_name); - Some(new_table) - } else { - None - } - } - - pub(crate) fn new_if_last_cache_definition_is_new( - &self, - last_cache_definition: &LastCacheDefinition, - ) -> Option { - if self.last_caches.contains_key(&last_cache_definition.name) { - None - } else { - let mut new_table = self.clone(); - new_table.add_last_cache(last_cache_definition.clone()); - Some(new_table) - } - } - - pub(crate) fn new_if_last_cache_deletes_existing( - &self, - last_cache_delete: &LastCacheDelete, - ) -> Option { - if self.last_caches.contains_key(&last_cache_delete.name) { - let mut new_table = self.clone(); - new_table.remove_last_cache(&last_cache_delete.name); - Some(new_table) - } else { - None + if !new_fields.is_empty() { + table.to_mut().add_columns(new_fields)?; } + Ok(table) } /// Check if the column exists in the [`TableDefinition`] @@ -1152,6 +1013,123 @@ impl TableDefinition { } } +trait TableUpdate { + fn table_id(&self) -> TableId; + fn table_name(&self) -> Arc; + fn update_table<'a>(&self, table: Cow<'a, TableDefinition>) + -> Result>; +} + +impl UpdateDatabaseSchema for T { + fn update_schema<'a>( + &self, + mut schema: Cow<'a, DatabaseSchema>, + ) -> Result> { + let Some(table) = schema.tables.get(&self.table_id()) else { + return Err(TableNotFound { + db_name: Arc::clone(&schema.name), + table_name: Arc::clone(&self.table_name()), + }); + }; + if let Cow::Owned(new_table) = self.update_table(Cow::Borrowed(table.as_ref()))? { + schema + .to_mut() + .insert_table(new_table.table_id, Arc::new(new_table)); + } + Ok(schema) + } +} + +impl TableUpdate for FieldAdditions { + fn table_id(&self) -> TableId { + self.table_id + } + fn table_name(&self) -> Arc { + Arc::clone(&self.table_name) + } + fn update_table<'a>( + &self, + table: Cow<'a, TableDefinition>, + ) -> Result> { + TableDefinition::add_fields(table, &self.field_definitions) + } +} + +impl TableUpdate for MetaCacheDefinition { + fn table_id(&self) -> TableId { + self.table_id + } + fn table_name(&self) -> Arc { + Arc::clone(&self.table_name) + } + fn update_table<'a>( + &self, + mut table: Cow<'a, TableDefinition>, + ) -> Result> { + if !table.meta_caches.contains_key(&self.cache_name) { + table.to_mut().add_meta_cache(self.clone()); + } + Ok(table) + } +} + +impl TableUpdate for MetaCacheDelete { + fn table_id(&self) -> TableId { + self.table_id + } + fn table_name(&self) -> Arc { + Arc::clone(&self.table_name) + } + fn update_table<'a>( + &self, + mut table: Cow<'a, TableDefinition>, + ) -> Result> { + if table.meta_caches.contains_key(&self.cache_name) { + table.to_mut().meta_caches.remove(&self.cache_name); + } + Ok(table) + } +} + +impl TableUpdate for LastCacheDefinition { + fn table_id(&self) -> TableId { + self.table_id + } + + fn table_name(&self) -> Arc { + Arc::clone(&self.table) + } + + fn update_table<'a>( + &self, + mut table: Cow<'a, TableDefinition>, + ) -> Result> { + if !table.last_caches.contains_key(&self.name) { + table.to_mut().add_last_cache(self.clone()); + } + Ok(table) + } +} + +impl TableUpdate for LastCacheDelete { + fn table_id(&self) -> TableId { + self.table_id + } + fn table_name(&self) -> Arc { + Arc::clone(&self.table_name) + } + + fn update_table<'a>( + &self, + mut table: Cow<'a, TableDefinition>, + ) -> Result> { + if table.last_caches.contains_key(&self.name) { + table.to_mut().last_caches.remove(&self.name); + } + Ok(table) + } +} + #[derive(Debug, Eq, PartialEq, Clone)] pub struct ColumnDefinition { pub id: ColumnId, @@ -1623,7 +1601,15 @@ mod tests { #[test] fn test_check_and_mark_table_as_deleted() { - let db_id = DbId::new(); + let db_id = DbId::from(0); + + let mut database = DatabaseSchema { + id: db_id, + name: "test".into(), + tables: SerdeVecMap::new(), + table_map: BiHashMap::new(), + deleted: false, + }; let deleted_table_id = TableId::new(); let table_name = Arc::from("boo"); let deleted_table_defn = DeleteTableDefinition { @@ -1633,7 +1619,6 @@ mod tests { table_name: Arc::clone(&table_name), deletion_time: 0, }; - let mut map = IndexMap::new(); let table_defn = Arc::new( TableDefinition::new( deleted_table_id, @@ -1657,12 +1642,26 @@ mod tests { ) .unwrap(), ); - map.insert(deleted_table_id, table_defn); - let mut updated_or_new_tables = SerdeVecMap::from(map); - check_and_mark_table_as_deleted(Some(&deleted_table_defn), &mut updated_or_new_tables); + database.insert_table(deleted_table_id, table_defn); + let new_db = DatabaseSchema::new_if_updated_from_batch( + &database, + &CatalogBatch { + database_id: database.id, + database_name: Arc::clone(&database.name), + time_ns: 0, + ops: vec![CatalogOp::DeleteTable(deleted_table_defn)], + }, + ) + .unwrap() + .expect("should mutate db"); + + // check that table is not under the old name + assert!(new_db.table_definition(table_name).is_none()); + + let deleted_table = new_db.table_definition("boo-19700101T000000").unwrap(); - let deleted_table = updated_or_new_tables.get(&deleted_table_id).unwrap(); + assert_eq!(deleted_table.table_id, deleted_table_id); assert_eq!(&*deleted_table.table_name, "boo-19700101T000000"); assert!(deleted_table.deleted); assert!(!deleted_table.series_key.is_empty());