Skip to content

Commit

Permalink
feat: add table lock for compact (#11238)
Browse files Browse the repository at this point in the history
* evict segment cache in abort operation

* fix max_threads in mutations

* Add table mutation lock

* check lock during compact

* fix empty pipeline

* add check in compact segment

* Add mutation lock heartbeat

* Add mutation_lock_expire_secs in settings

* refactor mutation lock

* fix error

* fix error

* remove unused codes

* remove unused codes

* Add revision

* Add mutex

* move mutation_lock in catalog

* Add mutation lock in sink commit

* fix test

* remove unused codes

* add test

* rename to table lock

* move to ee

* fix test

* refactor codes

* fix test

* fix test

* fix review comment

* remove table lock in database trait

* rename TableCopiedFileLock to EmptyProto

* fix review comment

* fix review comment

* remove Revision

* make lint

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
zhyass and mergify[bot] authored May 22, 2023
1 parent f019841 commit e9e4ec5
Show file tree
Hide file tree
Showing 52 changed files with 1,237 additions and 154 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ members = [
"src/query/users",
"src/query/ee-features/vacuum-handler",
"src/query/ee-features/aggregating-index",
"src/query/ee-features/table-lock",
# databend-query
"src/query/service",
# enterprise
Expand Down
2 changes: 2 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ build_exceptions! {
TableNotWritable(2012),
TableHistoricalDataNotFound(2013),
DuplicatedUpsertFiles(2014),
TableAlreadyLocked(2015),
TableLockExpired(2016),

// User api error codes.
UnknownUser(2201),
Expand Down
16 changes: 16 additions & 0 deletions src/meta/api/src/id_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use common_meta_kvapi::kvapi;
use crate::schema_api_keys::ID_GEN_DATABASE;
use crate::schema_api_keys::ID_GEN_INDEX;
use crate::schema_api_keys::ID_GEN_TABLE;
use crate::schema_api_keys::ID_GEN_TABLE_LOCK;
use crate::share_api_keys::ID_GEN_SHARE;
use crate::share_api_keys::ID_GEN_SHARE_ENDPOINT;

Expand Down Expand Up @@ -65,6 +66,12 @@ impl IdGenerator {
resource: ID_GEN_INDEX.to_string(),
}
}

pub fn table_lock_id() -> Self {
Self {
resource: ID_GEN_TABLE_LOCK.to_string(),
}
}
}

impl kvapi::Key for IdGenerator {
Expand Down Expand Up @@ -146,6 +153,15 @@ mod t {
assert_eq!(g1, g2);
}

{
let g1 = IdGenerator::table_lock_id();
let k = g1.to_string_key();
assert_eq!("__fd_id_gen/table_lock_id", k);

let g2 = IdGenerator::from_str_key(&k)?;
assert_eq!(g1, g2);
}

Ok(())
}

Expand Down
16 changes: 16 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,27 @@ use common_meta_app::schema::CreateDatabaseReply;
use common_meta_app::schema::CreateDatabaseReq;
use common_meta_app::schema::CreateIndexReply;
use common_meta_app::schema::CreateIndexReq;
use common_meta_app::schema::CreateTableLockRevReply;
use common_meta_app::schema::CreateTableLockRevReq;
use common_meta_app::schema::CreateTableReply;
use common_meta_app::schema::CreateTableReq;
use common_meta_app::schema::DatabaseInfo;
use common_meta_app::schema::DeleteTableLockRevReq;
use common_meta_app::schema::DropDatabaseReply;
use common_meta_app::schema::DropDatabaseReq;
use common_meta_app::schema::DropIndexReply;
use common_meta_app::schema::DropIndexReq;
use common_meta_app::schema::DropTableByIdReq;
use common_meta_app::schema::DropTableReply;
use common_meta_app::schema::ExtendTableLockRevReq;
use common_meta_app::schema::GetDatabaseReq;
use common_meta_app::schema::GetTableCopiedFileReply;
use common_meta_app::schema::GetTableCopiedFileReq;
use common_meta_app::schema::GetTableReq;
use common_meta_app::schema::IndexMeta;
use common_meta_app::schema::ListDatabaseReq;
use common_meta_app::schema::ListIndexesReq;
use common_meta_app::schema::ListTableLockRevReq;
use common_meta_app::schema::ListTableReq;
use common_meta_app::schema::RenameDatabaseReply;
use common_meta_app::schema::RenameDatabaseReq;
Expand Down Expand Up @@ -162,5 +167,16 @@ pub trait SchemaApi: Send + Sync {

async fn count_tables(&self, req: CountTablesReq) -> Result<CountTablesReply, KVAppError>;

async fn list_table_lock_revs(&self, req: ListTableLockRevReq) -> Result<Vec<u64>, KVAppError>;

async fn create_table_lock_rev(
&self,
req: CreateTableLockRevReq,
) -> Result<CreateTableLockRevReply, KVAppError>;

async fn extend_table_lock_rev(&self, req: ExtendTableLockRevReq) -> Result<(), KVAppError>;

async fn delete_table_lock_rev(&self, req: DeleteTableLockRevReq) -> Result<(), KVAppError>;

fn name(&self) -> String;
}
177 changes: 177 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ use common_meta_app::schema::CreateDatabaseReply;
use common_meta_app::schema::CreateDatabaseReq;
use common_meta_app::schema::CreateIndexReply;
use common_meta_app::schema::CreateIndexReq;
use common_meta_app::schema::CreateTableLockRevReply;
use common_meta_app::schema::CreateTableLockRevReq;
use common_meta_app::schema::CreateTableReply;
use common_meta_app::schema::CreateTableReq;
use common_meta_app::schema::DBIdTableName;
Expand All @@ -62,12 +64,15 @@ use common_meta_app::schema::DatabaseNameIdent;
use common_meta_app::schema::DatabaseType;
use common_meta_app::schema::DbIdList;
use common_meta_app::schema::DbIdListKey;
use common_meta_app::schema::DeleteTableLockRevReq;
use common_meta_app::schema::DropDatabaseReply;
use common_meta_app::schema::DropDatabaseReq;
use common_meta_app::schema::DropIndexReply;
use common_meta_app::schema::DropIndexReq;
use common_meta_app::schema::DropTableByIdReq;
use common_meta_app::schema::DropTableReply;
use common_meta_app::schema::EmptyProto;
use common_meta_app::schema::ExtendTableLockRevReq;
use common_meta_app::schema::GetDatabaseReq;
use common_meta_app::schema::GetTableCopiedFileReply;
use common_meta_app::schema::GetTableCopiedFileReq;
Expand All @@ -78,6 +83,7 @@ use common_meta_app::schema::IndexMeta;
use common_meta_app::schema::IndexNameIdent;
use common_meta_app::schema::ListDatabaseReq;
use common_meta_app::schema::ListIndexesReq;
use common_meta_app::schema::ListTableLockRevReq;
use common_meta_app::schema::ListTableReq;
use common_meta_app::schema::RenameDatabaseReply;
use common_meta_app::schema::RenameDatabaseReq;
Expand All @@ -91,6 +97,7 @@ use common_meta_app::schema::TableIdListKey;
use common_meta_app::schema::TableIdToName;
use common_meta_app::schema::TableIdent;
use common_meta_app::schema::TableInfo;
use common_meta_app::schema::TableLockKey;
use common_meta_app::schema::TableMeta;
use common_meta_app::schema::TableNameIdent;
use common_meta_app::schema::TruncateTableReply;
Expand Down Expand Up @@ -2509,6 +2516,176 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
Ok(CountTablesReply { count })
}

async fn list_table_lock_revs(&self, req: ListTableLockRevReq) -> Result<Vec<u64>, KVAppError> {
let prefix = format!("{}/{}", TableLockKey::PREFIX, req.table_id);
let reply = self.prefix_list_kv(&prefix).await?;

let mut revisions = vec![];
for (k, _) in reply.into_iter() {
let lock_key = TableLockKey::from_str_key(&k).map_err(|e| {
let inv = InvalidReply::new("list_table_lock_revs", &e);
let meta_net_err = MetaNetworkError::InvalidReply(inv);
MetaError::NetworkError(meta_net_err)
})?;

revisions.push(lock_key.revision);
}
Ok(revisions)
}

async fn create_table_lock_rev(
&self,
req: CreateTableLockRevReq,
) -> Result<CreateTableLockRevReply, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

let table_id = req.table_id;
let tbid = TableId { table_id };

let revision = fetch_id(self, IdGenerator::table_lock_id()).await?;

let ctx = &func_name!();
let mut trials = txn_trials(None, ctx);
loop {
trials.next().unwrap()?;

let (tb_meta_seq, _) = get_table_by_id_or_err(self, &tbid, ctx).await?;
let lock_key = TableLockKey { table_id, revision };

let lock = EmptyProto {};

let condition = vec![
// table is not changed
txn_cond_seq(&tbid, Eq, tb_meta_seq),
// assumes lock are absent.
txn_cond_seq(&lock_key, Eq, 0),
];

let if_then = vec![txn_op_put_with_expire(
&lock_key,
serialize_struct(&lock)?,
req.expire_at,
)];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
ident = display(&tbid),
succ = display(succ),
"create_table_lock_rev"
);

if succ {
break;
}
}

Ok(CreateTableLockRevReply { revision })
}

async fn extend_table_lock_rev(&self, req: ExtendTableLockRevReq) -> Result<(), KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

let table_id = req.table_id;
let revision = req.revision;

let ctx = &func_name!();
let mut trials = txn_trials(None, ctx);

loop {
trials.next().unwrap()?;

let tbid = TableId { table_id };
let (tb_meta_seq, _) = get_table_by_id_or_err(self, &tbid, ctx).await?;

let lock_key = TableLockKey { table_id, revision };
let (lock_key_seq, _): (_, Option<EmptyProto>) = get_pb_value(self, &lock_key).await?;

let lock = EmptyProto {};

let condition = vec![
// table is not changed
txn_cond_seq(&tbid, Eq, tb_meta_seq),
txn_cond_seq(&lock_key, Eq, lock_key_seq),
];

let if_then = vec![txn_op_put_with_expire(
&lock_key,
serialize_struct(&lock)?,
req.expire_at,
)];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
ident = display(&tbid),
succ = display(succ),
"extend_table_lock_rev"
);

if succ {
break;
}
}
Ok(())
}

async fn delete_table_lock_rev(&self, req: DeleteTableLockRevReq) -> Result<(), KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

let table_id = req.table_id;
let revision = req.revision;

let ctx = &func_name!();
let mut trials = txn_trials(None, ctx);

loop {
trials.next().unwrap()?;

let lock_key = TableLockKey { table_id, revision };
let (lock_key_seq, _): (_, Option<EmptyProto>) = get_pb_value(self, &lock_key).await?;
if lock_key_seq == 0 {
return Ok(());
}

let condition = vec![txn_cond_seq(&lock_key, Eq, lock_key_seq)];
let if_then = vec![txn_op_del(&lock_key)];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

let tbid = TableId { table_id };
debug!(
ident = display(&tbid),
succ = display(succ),
"delete_table_lock_rev"
);

if succ {
break;
}
}

Ok(())
}

fn name(&self) -> String {
"SchemaApiImpl".to_string()
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/api/src/schema_api_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
pub(crate) const ID_GEN_TABLE: &str = "table_id";
pub(crate) const ID_GEN_DATABASE: &str = "database_id";
pub(crate) const ID_GEN_TABLE_LOCK: &str = "table_lock_id";
pub(crate) const ID_GEN_INDEX: &str = "index_id";
Loading

1 comment on commit e9e4ec5

@vercel
Copy link

@vercel vercel bot commented on e9e4ec5 May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-databend.vercel.app
databend.rs
databend-git-main-databend.vercel.app
databend.vercel.app

Please sign in to comment.