-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(catalog): Rework how CatalogOps update the DatabaseSchema #25642
Conversation
4fc5548
to
c407a69
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nice refactor, easier to follow the method now. There is a lint error to fix, I'll approve it anyway.
let mut deleted_table_defn = None; | ||
let mut schema_name = Arc::clone(&db_schema.name); | ||
|
||
let mut schema = Cow::Borrowed(db_schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a neat idea!
influxdb3_catalog/src/catalog.rs
Outdated
&self, | ||
mut schema: Cow<'a, DatabaseSchema>, | ||
) -> Result<Cow<'a, DatabaseSchema>> { | ||
// TODO: check if we want to re-delete an already deleted DB. That is current behavior. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's possible atm if you know new db name (the name changes when we delete it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, when we delete a DB, its name gets changed. This way you can immediately create a new db with the same name and it will pass uniqueness validation. If they then issue another delete, they're deleting the new DB, not the old one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed with Praveen, this is a great refactor - much cleaner. I had one concern though, in comment.
influxdb3_catalog/src/catalog.rs
Outdated
#[test] | ||
fn test_check_and_mark_table_as_deleted() { | ||
let db_id = DbId::new(); | ||
let deleted_table_id = TableId::new(); | ||
let table_name = Arc::from("boo"); | ||
let deleted_table_defn = DeleteTableDefinition { | ||
database_id: db_id, | ||
database_name: Arc::from("foo"), | ||
table_id: deleted_table_id, | ||
table_name: Arc::clone(&table_name), | ||
deletion_time: 0, | ||
}; | ||
let mut map = IndexMap::new(); | ||
let table_defn = Arc::new( | ||
TableDefinition::new( | ||
deleted_table_id, | ||
Arc::clone(&table_name), | ||
vec![ | ||
(ColumnId::from(0), "tag_1".into(), InfluxColumnType::Tag), | ||
(ColumnId::from(1), "tag_2".into(), InfluxColumnType::Tag), | ||
(ColumnId::from(2), "tag_3".into(), InfluxColumnType::Tag), | ||
( | ||
ColumnId::from(3), | ||
"time".into(), | ||
InfluxColumnType::Timestamp, | ||
), | ||
( | ||
ColumnId::from(4), | ||
"field".into(), | ||
InfluxColumnType::Field(InfluxFieldType::String), | ||
), | ||
], | ||
vec![ColumnId::from(0), ColumnId::from(1), ColumnId::from(2)], | ||
) | ||
.unwrap(), | ||
); | ||
map.insert(deleted_table_id, table_defn); | ||
let mut updated_or_new_tables = SerdeVecMap::from(map); | ||
|
||
check_and_mark_table_as_deleted(Some(&deleted_table_defn), &mut updated_or_new_tables); | ||
|
||
let deleted_table = updated_or_new_tables.get(&deleted_table_id).unwrap(); | ||
assert_eq!(&*deleted_table.table_name, "boo-19700101T000000"); | ||
assert!(deleted_table.deleted); | ||
assert!(!deleted_table.series_key.is_empty()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There should be a new test added in this one's place, unless you had justification for removing it, but that is not clear.
I think you can replicate this with the new APIs you've added. That is my only recommendation for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially deleted it as it was testing a function that I'd removed, check_and_mark_table_as_deleted
. I'll add an equivalent test that validates the CatalogOp does the correct thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the commit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
31a7edf
to
465885b
Compare
Ah - looks like a test in CI is failing @jacksonrnewhouse - probably not hard to fix? |
465885b
to
3542579
Compare
3542579
to
d66b873
Compare
In adding additional CatalogOps for Processing Engine Plugins and Triggers, I had to work through
new_if_updated_batch()
. The intention is reasonable, which is to process CatalogOps and if they do update the DatabaseSchema, return a new copy that can then be inserted. However, this is done by updating a bunch of state inside the method as the ops are iterated. This ends up relying on brittle assumptions, mainly that there won't be a bunch of different operations in a single CatalogBatch, for instance that there won't be multiple deletes or a delete followed by a table creation.I pulled out the logic into a couple of traits, one or DB level changes and another for table updates. Among other things this will make #25639 cleaner.