Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(meta): fix alter table add/drop column with indexes #8664

Merged
merged 2 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions e2e_test/ddl/alter_table_column.slt
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,35 @@ drop materialized view mv;

statement ok
drop table t;

# Test the consistency of tables and indexes #https://github.com/risingwavelabs/risingwave/issues/8649
statement ok
create table t(id int primary key, a int, b varchar);

statement ok
create index idx on t(a);

statement ok
alter table t add column c int;

query IITI rowsort
select * from t;
----

statement ok
drop table t;

statement ok
create table t(id int primary key, a int, b varchar);

statement ok
create index idx on t(b) include(b);

statement ok
alter table t drop column a;

query II rowsort
select * from t where b = '1';

statement ok
drop table t;
8 changes: 8 additions & 0 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ impl Catalog {
.update_table(proto);
}

pub fn update_index(&mut self, proto: &PbIndex) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_index(proto);
}

pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
self.get_database_mut(db_id)
.unwrap()
Expand Down
28 changes: 28 additions & 0 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,34 @@ impl SchemaCatalog {
self.table_by_id.insert(id, table_ref);
}

pub fn update_index(&mut self, prost: &PbIndex) {
let name = prost.name.clone();
let id = prost.id.into();
let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
let primary_table = self
.get_table_by_id(&prost.primary_table_id.into())
.unwrap();
let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
let index_ref = Arc::new(index);

self.index_by_name.insert(name, index_ref.clone());
self.index_by_id.insert(id, index_ref.clone());

match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
Occupied(mut entry) => {
let pos = entry
.get()
.iter()
.position(|x| x.id == index_ref.id)
.unwrap();
*entry.get_mut().get_mut(pos).unwrap() = index_ref;
}
Vacant(_entry) => {
unreachable!()
}
};
}

pub fn drop_table(&mut self, id: TableId) {
let table_ref = self.table_by_id.remove(&id).unwrap();
self.table_by_name.remove(&table_ref.name).unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ impl FrontendObserverNode {
Operation::Delete => {
catalog_guard.drop_index(index.database_id, index.schema_id, index.id.into())
}
Operation::Update => catalog_guard.update_index(index),
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::View(view) => match resp.operation() {
Expand Down
49 changes: 47 additions & 2 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ macro_rules! commit_meta {
};
}
pub(crate) use commit_meta;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::expr::expr_node::RexNode;
use risingwave_pb::meta::CreatingJobInfo;

pub type CatalogManagerRef<S> = Arc<CatalogManager<S>>;
Expand Down Expand Up @@ -1529,27 +1531,70 @@ where
pub async fn finish_replace_table_procedure(
&self,
table: &Table,
table_col_index_mapping: ColIndexMapping,
) -> MetaResult<NotificationVersion> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let mut tables = BTreeMapTransaction::new(&mut database_core.tables);
let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes);
let key = (table.database_id, table.schema_id, table.name.clone());
assert!(
tables.contains_key(&table.id)
&& database_core.in_progress_creation_tracker.contains(&key),
"table must exist and be in altering procedure"
);

let index_ids: Vec<_> = indexes
.tree_ref()
.iter()
.filter(|(_, index)| index.primary_table_id == table.id)
.map(|(index_id, _index)| *index_id)
.collect_vec();

let mut updated_indexes = vec![];

for index_id in &index_ids {
let mut index = indexes.get_mut(*index_id).unwrap();
index
.index_item
.iter_mut()
.for_each(|x| match x.rex_node.as_mut().unwrap() {
RexNode::InputRef(input_col_idx) => {
*input_col_idx =
table_col_index_mapping.map(*input_col_idx as usize) as u32;
assert_eq!(
x.return_type,
table.columns[*input_col_idx as usize]
.column_desc
.clone()
.unwrap()
.column_type
);
}
RexNode::FuncCall(_) => unimplemented!(),
_ => unreachable!(),
});

updated_indexes.push(indexes.get(index_id).cloned().unwrap());
}

// TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must
database_core.in_progress_creation_tracker.remove(&key);

tables.insert(table.id, table.clone());
commit_meta!(self, tables)?;
commit_meta!(self, tables, indexes)?;

let version = self
// TODO: support group notification.
let mut version = self
.notify_frontend(Operation::Update, Info::Table(table.to_owned()))
.await;

for index in updated_indexes {
version = self
.notify_frontend(Operation::Update, Info::Index(index))
.await;
}

Ok(version)
}

Expand Down
15 changes: 12 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,12 @@ where

let result = try {
let (ctx, table_fragments) = self
.build_replace_table(env, &stream_job, fragment_graph, table_col_index_mapping)
.build_replace_table(
env,
&stream_job,
fragment_graph,
table_col_index_mapping.clone(),
)
.await?;

self.stream_manager
Expand All @@ -559,7 +564,10 @@ where
};

match result {
Ok(_) => self.finish_replace_table(&stream_job).await,
Ok(_) => {
self.finish_replace_table(&stream_job, table_col_index_mapping)
.await
}
Err(err) => {
self.cancel_replace_table(&stream_job).await?;
Err(err)
Expand Down Expand Up @@ -687,13 +695,14 @@ where
async fn finish_replace_table(
&self,
stream_job: &StreamingJob,
table_col_index_mapping: ColIndexMapping,
) -> MetaResult<NotificationVersion> {
let StreamingJob::Table(None, table) = stream_job else {
unreachable!("unexpected job: {stream_job:?}")
};

self.catalog_manager
.finish_replace_table_procedure(table)
.finish_replace_table_procedure(table, table_col_index_mapping)
.await
}

Expand Down