Skip to content

Commit

Permalink
fix(meta): fix alter table add/drop column with indexes (risingwavela…
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Mar 21, 2023
1 parent 59a0947 commit d557a6c
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 5 deletions.
32 changes: 32 additions & 0 deletions e2e_test/ddl/alter_table_column.slt
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,35 @@ drop materialized view mv;

statement ok
drop table t;

# Test the consistency of tables and indexes #https://github.com/risingwavelabs/risingwave/issues/8649
statement ok
create table t(id int primary key, a int, b varchar);

statement ok
create index idx on t(a);

statement ok
alter table t add column c int;

query IITI rowsort
select * from t;
----

statement ok
drop table t;

statement ok
create table t(id int primary key, a int, b varchar);

statement ok
create index idx on t(b) include(b);

statement ok
alter table t drop column a;

query II rowsort
select * from t where b = '1';

statement ok
drop table t;
8 changes: 8 additions & 0 deletions src/frontend/src/catalog/root_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ impl Catalog {
.update_table(proto);
}

pub fn update_index(&mut self, proto: &PbIndex) {
self.get_database_mut(proto.database_id)
.unwrap()
.get_schema_mut(proto.schema_id)
.unwrap()
.update_index(proto);
}

pub fn drop_source(&mut self, db_id: DatabaseId, schema_id: SchemaId, source_id: SourceId) {
self.get_database_mut(db_id)
.unwrap()
Expand Down
28 changes: 28 additions & 0 deletions src/frontend/src/catalog/schema_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,34 @@ impl SchemaCatalog {
self.table_by_id.insert(id, table_ref);
}

pub fn update_index(&mut self, prost: &PbIndex) {
let name = prost.name.clone();
let id = prost.id.into();
let index_table = self.get_table_by_id(&prost.index_table_id.into()).unwrap();
let primary_table = self
.get_table_by_id(&prost.primary_table_id.into())
.unwrap();
let index: IndexCatalog = IndexCatalog::build_from(prost, index_table, primary_table);
let index_ref = Arc::new(index);

self.index_by_name.insert(name, index_ref.clone());
self.index_by_id.insert(id, index_ref.clone());

match self.indexes_by_table_id.entry(index_ref.primary_table.id) {
Occupied(mut entry) => {
let pos = entry
.get()
.iter()
.position(|x| x.id == index_ref.id)
.unwrap();
*entry.get_mut().get_mut(pos).unwrap() = index_ref;
}
Vacant(_entry) => {
unreachable!()
}
};
}

pub fn drop_table(&mut self, id: TableId) {
let table_ref = self.table_by_id.remove(&id).unwrap();
self.table_by_name.remove(&table_ref.name).unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ impl FrontendObserverNode {
Operation::Delete => {
catalog_guard.drop_index(index.database_id, index.schema_id, index.id.into())
}
Operation::Update => catalog_guard.update_index(index),
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::View(view) => match resp.operation() {
Expand Down
49 changes: 47 additions & 2 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ macro_rules! commit_meta {
};
}
pub(crate) use commit_meta;
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::expr::expr_node::RexNode;
use risingwave_pb::meta::CreatingJobInfo;

pub type CatalogManagerRef<S> = Arc<CatalogManager<S>>;
Expand Down Expand Up @@ -1529,27 +1531,70 @@ where
pub async fn finish_replace_table_procedure(
&self,
table: &Table,
table_col_index_mapping: ColIndexMapping,
) -> MetaResult<NotificationVersion> {
let core = &mut *self.core.lock().await;
let database_core = &mut core.database;
let mut tables = BTreeMapTransaction::new(&mut database_core.tables);
let mut indexes = BTreeMapTransaction::new(&mut database_core.indexes);
let key = (table.database_id, table.schema_id, table.name.clone());
assert!(
tables.contains_key(&table.id)
&& database_core.in_progress_creation_tracker.contains(&key),
"table must exist and be in altering procedure"
);

let index_ids: Vec<_> = indexes
.tree_ref()
.iter()
.filter(|(_, index)| index.primary_table_id == table.id)
.map(|(index_id, _index)| *index_id)
.collect_vec();

let mut updated_indexes = vec![];

for index_id in &index_ids {
let mut index = indexes.get_mut(*index_id).unwrap();
index
.index_item
.iter_mut()
.for_each(|x| match x.rex_node.as_mut().unwrap() {
RexNode::InputRef(input_col_idx) => {
*input_col_idx =
table_col_index_mapping.map(*input_col_idx as usize) as u32;
assert_eq!(
x.return_type,
table.columns[*input_col_idx as usize]
.column_desc
.clone()
.unwrap()
.column_type
);
}
RexNode::FuncCall(_) => unimplemented!(),
_ => unreachable!(),
});

updated_indexes.push(indexes.get(index_id).cloned().unwrap());
}

// TODO: Here we reuse the `creation` tracker for `alter` procedure, as an `alter` must
database_core.in_progress_creation_tracker.remove(&key);

tables.insert(table.id, table.clone());
commit_meta!(self, tables)?;
commit_meta!(self, tables, indexes)?;

let version = self
// TODO: support group notification.
let mut version = self
.notify_frontend(Operation::Update, Info::Table(table.to_owned()))
.await;

for index in updated_indexes {
version = self
.notify_frontend(Operation::Update, Info::Index(index))
.await;
}

Ok(version)
}

Expand Down
15 changes: 12 additions & 3 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,12 @@ where

let result = try {
let (ctx, table_fragments) = self
.build_replace_table(env, &stream_job, fragment_graph, table_col_index_mapping)
.build_replace_table(
env,
&stream_job,
fragment_graph,
table_col_index_mapping.clone(),
)
.await?;

self.stream_manager
Expand All @@ -559,7 +564,10 @@ where
};

match result {
Ok(_) => self.finish_replace_table(&stream_job).await,
Ok(_) => {
self.finish_replace_table(&stream_job, table_col_index_mapping)
.await
}
Err(err) => {
self.cancel_replace_table(&stream_job).await?;
Err(err)
Expand Down Expand Up @@ -687,13 +695,14 @@ where
async fn finish_replace_table(
&self,
stream_job: &StreamingJob,
table_col_index_mapping: ColIndexMapping,
) -> MetaResult<NotificationVersion> {
let StreamingJob::Table(None, table) = stream_job else {
unreachable!("unexpected job: {stream_job:?}")
};

self.catalog_manager
.finish_replace_table_procedure(table)
.finish_replace_table_procedure(table, table_col_index_mapping)
.await
}

Expand Down

0 comments on commit d557a6c

Please sign in to comment.