Skip to content

Commit

Permalink
feat(query): Support generate virtual columns (databendlabs#11590)
Browse files Browse the repository at this point in the history
* feat(query): Support generate virtual columns

* fix

* fix

* fix taplo fmt

* add tests

* fix

* fix

* add test

* fix

* add tests

* add alter virtual column

* fix tests

* add virtual block folder

* fix

* fix

* fix meta api

* fix

* fix

* fix

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and andylokandy committed Nov 27, 2023
1 parent 36f0e80 commit 4260471
Show file tree
Hide file tree
Showing 81 changed files with 3,501 additions and 55 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ build_exceptions! {
LicenceDenied(1112),
UnknownDatamask(1113),
UnmatchColumnDataType(1114),
VirtualColumnNotFound(1115),
VirtualColumnAlreadyExists(1116),

// Data Related Errors

Expand Down
30 changes: 30 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use common_meta_app::schema::CreateTableLockRevReply;
use common_meta_app::schema::CreateTableLockRevReq;
use common_meta_app::schema::CreateTableReply;
use common_meta_app::schema::CreateTableReq;
use common_meta_app::schema::CreateVirtualColumnReply;
use common_meta_app::schema::CreateVirtualColumnReq;
use common_meta_app::schema::DatabaseInfo;
use common_meta_app::schema::DeleteTableLockRevReq;
use common_meta_app::schema::DropDatabaseReply;
Expand All @@ -32,6 +34,8 @@ use common_meta_app::schema::DropIndexReply;
use common_meta_app::schema::DropIndexReq;
use common_meta_app::schema::DropTableByIdReq;
use common_meta_app::schema::DropTableReply;
use common_meta_app::schema::DropVirtualColumnReply;
use common_meta_app::schema::DropVirtualColumnReq;
use common_meta_app::schema::ExtendTableLockRevReq;
use common_meta_app::schema::GetDatabaseReq;
use common_meta_app::schema::GetTableCopiedFileReply;
Expand All @@ -42,6 +46,7 @@ use common_meta_app::schema::ListDatabaseReq;
use common_meta_app::schema::ListIndexesReq;
use common_meta_app::schema::ListTableLockRevReq;
use common_meta_app::schema::ListTableReq;
use common_meta_app::schema::ListVirtualColumnsReq;
use common_meta_app::schema::RenameDatabaseReply;
use common_meta_app::schema::RenameDatabaseReq;
use common_meta_app::schema::RenameTableReply;
Expand All @@ -58,8 +63,11 @@ use common_meta_app::schema::UndropTableReply;
use common_meta_app::schema::UndropTableReq;
use common_meta_app::schema::UpdateTableMetaReply;
use common_meta_app::schema::UpdateTableMetaReq;
use common_meta_app::schema::UpdateVirtualColumnReply;
use common_meta_app::schema::UpdateVirtualColumnReq;
use common_meta_app::schema::UpsertTableOptionReply;
use common_meta_app::schema::UpsertTableOptionReq;
use common_meta_app::schema::VirtualColumnMeta;
use common_meta_types::GCDroppedDataReply;
use common_meta_types::GCDroppedDataReq;
use common_meta_types::MetaId;
Expand Down Expand Up @@ -111,6 +119,28 @@ pub trait SchemaApi: Send + Sync {
req: ListIndexesReq,
) -> Result<Vec<(u64, String, IndexMeta)>, KVAppError>;

// virtual column

async fn create_virtual_column(
&self,
req: CreateVirtualColumnReq,
) -> Result<CreateVirtualColumnReply, KVAppError>;

async fn update_virtual_column(
&self,
req: UpdateVirtualColumnReq,
) -> Result<UpdateVirtualColumnReply, KVAppError>;

async fn drop_virtual_column(
&self,
req: DropVirtualColumnReq,
) -> Result<DropVirtualColumnReply, KVAppError>;

async fn list_virtual_columns(
&self,
req: ListVirtualColumnsReq,
) -> Result<Vec<VirtualColumnMeta>, KVAppError>;

// table

async fn create_table(&self, req: CreateTableReq) -> Result<CreateTableReply, KVAppError>;
Expand Down
206 changes: 206 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use common_meta_app::app_error::UnknownDatabaseId;
use common_meta_app::app_error::UnknownIndex;
use common_meta_app::app_error::UnknownTable;
use common_meta_app::app_error::UnknownTableId;
use common_meta_app::app_error::VirtualColumnAlreadyExists;
use common_meta_app::app_error::WrongShare;
use common_meta_app::app_error::WrongShareObject;
use common_meta_app::schema::CountTablesKey;
Expand All @@ -54,6 +55,8 @@ use common_meta_app::schema::CreateTableLockRevReply;
use common_meta_app::schema::CreateTableLockRevReq;
use common_meta_app::schema::CreateTableReply;
use common_meta_app::schema::CreateTableReq;
use common_meta_app::schema::CreateVirtualColumnReply;
use common_meta_app::schema::CreateVirtualColumnReq;
use common_meta_app::schema::DBIdTableName;
use common_meta_app::schema::DatabaseId;
use common_meta_app::schema::DatabaseIdToName;
Expand All @@ -71,6 +74,8 @@ use common_meta_app::schema::DropIndexReply;
use common_meta_app::schema::DropIndexReq;
use common_meta_app::schema::DropTableByIdReq;
use common_meta_app::schema::DropTableReply;
use common_meta_app::schema::DropVirtualColumnReply;
use common_meta_app::schema::DropVirtualColumnReq;
use common_meta_app::schema::EmptyProto;
use common_meta_app::schema::ExtendTableLockRevReq;
use common_meta_app::schema::GetDatabaseReq;
Expand All @@ -85,6 +90,7 @@ use common_meta_app::schema::ListDatabaseReq;
use common_meta_app::schema::ListIndexesReq;
use common_meta_app::schema::ListTableLockRevReq;
use common_meta_app::schema::ListTableReq;
use common_meta_app::schema::ListVirtualColumnsReq;
use common_meta_app::schema::RenameDatabaseReply;
use common_meta_app::schema::RenameDatabaseReq;
use common_meta_app::schema::RenameTableReply;
Expand All @@ -108,9 +114,13 @@ use common_meta_app::schema::UndropTableReply;
use common_meta_app::schema::UndropTableReq;
use common_meta_app::schema::UpdateTableMetaReply;
use common_meta_app::schema::UpdateTableMetaReq;
use common_meta_app::schema::UpdateVirtualColumnReply;
use common_meta_app::schema::UpdateVirtualColumnReq;
use common_meta_app::schema::UpsertTableCopiedFileReq;
use common_meta_app::schema::UpsertTableOptionReply;
use common_meta_app::schema::UpsertTableOptionReq;
use common_meta_app::schema::VirtualColumnMeta;
use common_meta_app::schema::VirtualColumnNameIdent;
use common_meta_app::share::ShareGrantObject;
use common_meta_app::share::ShareId;
use common_meta_app::share::ShareNameIdent;
Expand Down Expand Up @@ -163,6 +173,7 @@ use crate::util::deserialize_u64;
use crate::util::get_index_metas_by_ids;
use crate::util::get_table_by_id_or_err;
use crate::util::get_table_names_by_ids;
use crate::util::get_virtual_column_by_id_or_err;
use crate::util::list_tables_from_share_db;
use crate::util::list_tables_from_unshare_db;
use crate::util::mget_pb_values;
Expand Down Expand Up @@ -1071,6 +1082,201 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
Ok(index_metas)
}

// virtual column

async fn create_virtual_column(
&self,
req: CreateVirtualColumnReq,
) -> Result<CreateVirtualColumnReply, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

let ctx = &func_name!();
let mut trials = txn_trials(None, ctx);
loop {
trials.next().unwrap()?;

let (_, old_virtual_column_opt): (_, Option<VirtualColumnMeta>) =
get_pb_value(self, &req.name_ident).await?;

if old_virtual_column_opt.is_some() {
return Err(KVAppError::AppError(AppError::VirtualColumnAlreadyExists(
VirtualColumnAlreadyExists::new(
req.name_ident.table_id,
format!(
"create virtual column with tenant: {} table_id: {}",
req.name_ident.tenant, req.name_ident.table_id
),
),
)));
}
let virtual_column_meta = VirtualColumnMeta {
table_id: req.name_ident.table_id,
virtual_columns: req.virtual_columns.clone(),
created_on: Utc::now(),
updated_on: None,
};

// Create virtual column by inserting this record:
// (tenant, table_id) -> virtual_column_meta
{
let condition = vec![txn_cond_seq(&req.name_ident, Eq, 0)];
let if_then = vec![txn_op_put(
&req.name_ident,
serialize_struct(&virtual_column_meta)?,
)];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
req.name_ident = debug(&virtual_column_meta),
succ = display(succ),
"create_virtual_column"
);

if succ {
break;
}
}
}

Ok(CreateVirtualColumnReply {})
}

async fn update_virtual_column(
&self,
req: UpdateVirtualColumnReq,
) -> Result<UpdateVirtualColumnReply, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

let ctx = &func_name!();
let mut trials = txn_trials(None, ctx);
loop {
trials.next().unwrap()?;

let (seq, old_virtual_column_meta) =
get_virtual_column_by_id_or_err(self, &req.name_ident, ctx).await?;

let virtual_column_meta = VirtualColumnMeta {
table_id: req.name_ident.table_id,
virtual_columns: req.virtual_columns.clone(),
created_on: old_virtual_column_meta.created_on,
updated_on: Some(Utc::now()),
};

// Update virtual column by inserting this record:
// (tenant, table_id) -> virtual_column_meta
{
let condition = vec![txn_cond_seq(&req.name_ident, Eq, seq)];
let if_then = vec![txn_op_put(
&req.name_ident,
serialize_struct(&virtual_column_meta)?,
)];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
req.name_ident = debug(&virtual_column_meta),
succ = display(succ),
"update_virtual_column"
);

if succ {
break;
}
}
}

Ok(UpdateVirtualColumnReply {})
}

async fn drop_virtual_column(
&self,
req: DropVirtualColumnReq,
) -> Result<DropVirtualColumnReply, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

let ctx = &func_name!();
let mut trials = txn_trials(None, ctx);
loop {
trials.next().unwrap()?;

let (_, _) = get_virtual_column_by_id_or_err(self, &req.name_ident, ctx).await?;

// Drop virtual column by deleting this record:
// (tenant, table_id) -> virtual_column_meta
{
let if_then = vec![txn_op_del(&req.name_ident)];
let txn_req = TxnRequest {
condition: vec![],
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
req.name_ident = debug(&req.name_ident),
succ = display(succ),
"drop_virtual_column"
);

if succ {
break;
}
}
}

Ok(DropVirtualColumnReply {})
}

async fn list_virtual_columns(
&self,
req: ListVirtualColumnsReq,
) -> Result<Vec<VirtualColumnMeta>, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

if let Some(table_id) = req.table_id {
let name_ident = VirtualColumnNameIdent {
tenant: req.tenant.clone(),
table_id,
};
let (_, virtual_column_opt): (_, Option<VirtualColumnMeta>) =
get_pb_value(self, &name_ident).await?;

if let Some(virtual_column) = virtual_column_opt {
return Ok(vec![virtual_column]);
} else {
return Ok(vec![]);
}
}

// Get virtual columns list by `prefix_list` "<prefix>/<tenant>"
let prefix_key = kvapi::KeyBuilder::new_prefixed(VirtualColumnNameIdent::PREFIX)
.push_str(&req.tenant)
.done();

let list = self.prefix_list_kv(&prefix_key).await?;
let mut virtual_column_list = Vec::with_capacity(list.len());
for (_, seq) in list.iter() {
let virtual_column_meta: VirtualColumnMeta = deserialize_struct(&seq.data)?;
virtual_column_list.push(virtual_column_meta);
}

Ok(virtual_column_list)
}

#[tracing::instrument(level = "debug", ret, err, skip_all)]
async fn create_table(&self, req: CreateTableReq) -> Result<CreateTableReply, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());
Expand Down
Loading

0 comments on commit 4260471

Please sign in to comment.