Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): create drop inverted index #14859

Merged
merged 12 commits into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/common/license/src/license.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum Feature {
BackgroundService,
DataMask,
AggregateIndex,
InvertedIndex,
ComputedColumn,
StorageEncryption,
Stream,
Expand Down Expand Up @@ -59,6 +60,9 @@ impl Display for Feature {
Feature::AggregateIndex => {
write!(f, "aggregate_index")
}
Feature::InvertedIndex => {
write!(f, "inverted_index")
}
Feature::ComputedColumn => {
write!(f, "computed_column")
}
Expand Down
14 changes: 14 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ use databend_common_meta_app::schema::CreateIndexReply;
use databend_common_meta_app::schema::CreateIndexReq;
use databend_common_meta_app::schema::CreateLockRevReply;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::CreateTableIndexReply;
use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_meta_app::schema::CreateTableReply;
use databend_common_meta_app::schema::CreateTableReq;
use databend_common_meta_app::schema::CreateVirtualColumnReply;
Expand All @@ -38,6 +40,8 @@ use databend_common_meta_app::schema::DropDatabaseReq;
use databend_common_meta_app::schema::DropIndexReply;
use databend_common_meta_app::schema::DropIndexReq;
use databend_common_meta_app::schema::DropTableByIdReq;
use databend_common_meta_app::schema::DropTableIndexReply;
use databend_common_meta_app::schema::DropTableIndexReq;
use databend_common_meta_app::schema::DropTableReply;
use databend_common_meta_app::schema::DropVirtualColumnReply;
use databend_common_meta_app::schema::DropVirtualColumnReq;
Expand Down Expand Up @@ -239,6 +243,16 @@ pub trait SchemaApi: Send + Sync {
req: SetTableColumnMaskPolicyReq,
) -> Result<SetTableColumnMaskPolicyReply, KVAppError>;

async fn create_table_index(
&self,
req: CreateTableIndexReq,
) -> Result<CreateTableIndexReply, KVAppError>;

async fn drop_table_index(
&self,
req: DropTableIndexReq,
) -> Result<DropTableIndexReply, KVAppError>;

async fn get_drop_table_infos(
&self,
req: ListDroppedTableReq,
Expand Down
152 changes: 152 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ use databend_common_meta_app::schema::CreateIndexReq;
use databend_common_meta_app::schema::CreateLockRevReply;
use databend_common_meta_app::schema::CreateLockRevReq;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::CreateTableIndexReply;
use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_meta_app::schema::CreateTableReply;
use databend_common_meta_app::schema::CreateTableReq;
use databend_common_meta_app::schema::CreateVirtualColumnReply;
Expand All @@ -98,6 +100,8 @@ use databend_common_meta_app::schema::DropDatabaseReq;
use databend_common_meta_app::schema::DropIndexReply;
use databend_common_meta_app::schema::DropIndexReq;
use databend_common_meta_app::schema::DropTableByIdReq;
use databend_common_meta_app::schema::DropTableIndexReply;
use databend_common_meta_app::schema::DropTableIndexReq;
use databend_common_meta_app::schema::DropTableReply;
use databend_common_meta_app::schema::DropVirtualColumnReply;
use databend_common_meta_app::schema::DropVirtualColumnReq;
Expand Down Expand Up @@ -148,6 +152,7 @@ use databend_common_meta_app::schema::TableIdList;
use databend_common_meta_app::schema::TableIdListKey;
use databend_common_meta_app::schema::TableIdToName;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableIndex;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableInfoFilter;
use databend_common_meta_app::schema::TableMeta;
Expand Down Expand Up @@ -3131,6 +3136,153 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

#[logcall::logcall("debug")]
#[minitrace::trace]
async fn create_table_index(
&self,
req: CreateTableIndexReq,
) -> Result<CreateTableIndexReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let tbid = TableId {
table_id: req.table_id,
};

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let (tb_meta_seq, table_meta): (_, Option<TableMeta>) =
get_pb_value(self, &tbid).await?;

debug!(ident :% =(&tbid); "create_table_index");

if tb_meta_seq == 0 || table_meta.is_none() {
return Err(KVAppError::AppError(AppError::UnknownTableId(
UnknownTableId::new(req.table_id, "create_table_index"),
)));
}

let mut table_meta = table_meta.unwrap();
// update table indexes
let indexes = &mut table_meta.indexes;
if indexes.contains_key(&req.name) {
match req.create_option {
CreateOption::None => {
return Err(KVAppError::AppError(AppError::IndexAlreadyExists(
IndexAlreadyExists::new(&req.name, "create table index".to_string()),
)));
}
CreateOption::CreateIfNotExists => {
return Ok(CreateTableIndexReply {});
}
CreateOption::CreateOrReplace => {}
}
}
// check the index column id exists
for column_id in &req.column_ids {
if table_meta.schema.is_column_deleted(*column_id) {
return Err(KVAppError::AppError(AppError::UnknownIndex(
UnknownIndex::new(
&req.name,
format!("table index column id {} is not exist", column_id),
),
)));
}
}
let index = TableIndex {
name: req.name.clone(),
column_ids: req.column_ids.clone(),
};
indexes.insert(req.name.clone(), index);

let txn_req = TxnRequest {
condition: vec![
// table is not changed
txn_cond_seq(&tbid, Eq, tb_meta_seq),
],
if_then: vec![
txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta
],
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
id :? =(&tbid),
succ = succ;
"create_table_index"
);

if succ {
return Ok(CreateTableIndexReply {});
}
}
}

#[logcall::logcall("debug")]
#[minitrace::trace]
async fn drop_table_index(
&self,
req: DropTableIndexReq,
) -> Result<DropTableIndexReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let tbid = TableId {
table_id: req.table_id,
};

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let (tb_meta_seq, table_meta): (_, Option<TableMeta>) =
get_pb_value(self, &tbid).await?;

debug!(ident :% =(&tbid); "drop_table_index");

if tb_meta_seq == 0 || table_meta.is_none() {
return Err(KVAppError::AppError(AppError::UnknownTableId(
UnknownTableId::new(req.table_id, "drop_table_index"),
)));
}

let mut table_meta = table_meta.unwrap();
// update table indexes
let indexes = &mut table_meta.indexes;
if !indexes.contains_key(&req.name) && !req.if_exists {
return Err(KVAppError::AppError(AppError::UnknownIndex(
UnknownIndex::new(&req.name, "drop table index".to_string()),
)));
}
indexes.remove(&req.name);

let txn_req = TxnRequest {
condition: vec![
// table is not changed
txn_cond_seq(&tbid, Eq, tb_meta_seq),
],
if_then: vec![
txn_op_put(&tbid, serialize_struct(&table_meta)?), // tb_id -> tb_meta
],
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
id :? =(&tbid),
succ = succ;
"drop_table_index"
);

if succ {
return Ok(DropTableIndexReply {});
}
}
}

#[logcall::logcall("debug")]
#[minitrace::trace]
async fn get_drop_table_infos(
Expand Down
Loading
Loading