Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cloud/src/meta-service/http_encode_key.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ static std::unordered_map<std::string_view,
{"TxnInfoKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return txn_info_key(KeyInfoSetter<TxnInfoKeyInfo>{p}.get()); } , parse<TxnInfoPB>}} ,
{"TxnIndexKey", {{"instance_id", "txn_id"}, [](param_type& p) { return txn_index_key(KeyInfoSetter<TxnIndexKeyInfo>{p}.get()); } , parse<TxnIndexPB>}} ,
{"TxnRunningKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return txn_running_key(KeyInfoSetter<TxnRunningKeyInfo>{p}.get()); } , parse<TxnRunningPB>}} ,
{"VersionKey", {{"instance_id", "db_id", "tbl_id", "partition_id"}, [](param_type& p) { return version_key(KeyInfoSetter<VersionKeyInfo>{p}.get()); } , parse<VersionPB>}} ,
{"PartitionVersionKey", {{"instance_id", "db_id", "tbl_id", "partition_id"}, [](param_type& p) { return partition_version_key(KeyInfoSetter<PartitionVersionKeyInfo>{p}.get()); } , parse<VersionPB>}} ,
{"TableVersionKey", {{"instance_id", "db_id", "tbl_id"}, [](param_type& p) { return table_version_key(KeyInfoSetter<TableVersionKeyInfo>{p}.get()); } , parse<VersionPB>}} ,
{"MetaRowsetKey", {{"instance_id", "tablet_id", "version"}, [](param_type& p) { return meta_rowset_key(KeyInfoSetter<MetaRowsetKeyInfo>{p}.get()); } , parse<doris::RowsetMetaCloudPB>}} ,
{"MetaRowsetTmpKey", {{"instance_id", "txn_id", "tablet_id"}, [](param_type& p) { return meta_rowset_tmp_key(KeyInfoSetter<MetaRowsetTmpKeyInfo>{p}.get()); } , parse<doris::RowsetMetaCloudPB>}} ,
{"MetaTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return meta_tablet_key(KeyInfoSetter<MetaTabletKeyInfo>{p}.get()); } , parse<doris::TabletMetaCloudPB>}} ,
Expand Down
27 changes: 21 additions & 6 deletions cloud/src/meta-service/keys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ namespace doris::cloud {
[[maybe_unused]] static const char* TXN_KEY_INFIX_INDEX = "txn_index";
[[maybe_unused]] static const char* TXN_KEY_INFIX_RUNNING = "txn_running";

[[maybe_unused]] static const char* VERSION_KEY_INFIX = "partition";
[[maybe_unused]] static const char* PARTITION_VERSION_KEY_INFIX = "partition";
[[maybe_unused]] static const char* TABLE_VERSION_KEY_INFIX = "table";

[[maybe_unused]] static const char* META_KEY_INFIX_ROWSET = "rowset";
[[maybe_unused]] static const char* META_KEY_INFIX_ROWSET_TMP = "rowset_tmp";
Expand Down Expand Up @@ -114,9 +115,9 @@ static void encode_prefix(const T& t, std::string* key) {
InstanceKeyInfo,
TxnLabelKeyInfo, TxnInfoKeyInfo, TxnIndexKeyInfo, TxnRunningKeyInfo,
MetaRowsetKeyInfo, MetaRowsetTmpKeyInfo, MetaTabletKeyInfo, MetaTabletIdxKeyInfo, MetaSchemaKeyInfo,
MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo, MetaPendingDeleteBitmapInfo, VersionKeyInfo,
MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo, MetaPendingDeleteBitmapInfo, PartitionVersionKeyInfo,
RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo, RecycleTxnKeyInfo, RecycleStageKeyInfo,
StatsTabletKeyInfo,
StatsTabletKeyInfo, TableVersionKeyInfo,
JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo,
CopyJobKeyInfo, CopyFileKeyInfo, MetaRowsetSchemaKeyInfo, StorageVaultKeyInfo>);

Expand All @@ -139,7 +140,8 @@ static void encode_prefix(const T& t, std::string* key) {
|| std::is_same_v<T, MetaDeleteBitmapUpdateLockInfo>
|| std::is_same_v<T, MetaPendingDeleteBitmapInfo>) {
encode_bytes(META_KEY_PREFIX, key);
} else if constexpr (std::is_same_v<T, VersionKeyInfo>) {
} else if constexpr (std::is_same_v<T, PartitionVersionKeyInfo>
|| std::is_same_v<T, TableVersionKeyInfo>) {
encode_bytes(VERSION_KEY_PREFIX, key);
} else if constexpr (std::is_same_v<T, RecycleIndexKeyInfo>
|| std::is_same_v<T, RecyclePartKeyInfo>
Expand Down Expand Up @@ -217,9 +219,22 @@ void txn_running_key(const TxnRunningKeyInfo& in, std::string* out) {
// Version keys
//==============================================================================

void version_key(const VersionKeyInfo& in, std::string* out) {
std::string version_key_prefix(std::string_view instance_id) {
std::string out;
encode_prefix(TableVersionKeyInfo {instance_id, 0, 0}, &out);
return out;
}

void table_version_key(const TableVersionKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "version" ${instance_id}
encode_bytes(TABLE_VERSION_KEY_INFIX, out); // "table"
encode_int64(std::get<1>(in), out); // db_id
encode_int64(std::get<2>(in), out); // tbl_id
}

void partition_version_key(const PartitionVersionKeyInfo& in, std::string* out) {
encode_prefix(in, out); // 0x01 "version" ${instance_id}
encode_bytes(VERSION_KEY_INFIX, out); // "partition"
encode_bytes(PARTITION_VERSION_KEY_INFIX, out); // "partition"
encode_int64(std::get<1>(in), out); // db_id
encode_int64(std::get<2>(in), out); // tbl_id
encode_int64(std::get<3>(in), out); // partition_id
Expand Down
13 changes: 10 additions & 3 deletions cloud/src/meta-service/keys.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
// 0x01 "txn" ${instance_id} "txn_running" ${db_id} ${txn_id} -> TxnRunningPB
//
// 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id} ${partition_id} -> VersionPB
// 0x01 "version" ${instance_id} "table" ${db_id} ${tbl_id} -> int64
//
// 0x01 "meta" ${instance_id} "rowset" ${tablet_id} ${version} -> RowsetMetaCloudPB
// 0x01 "meta" ${instance_id} "rowset_tmp" ${txn_id} ${tablet_id} -> RowsetMetaCloudPB
Expand Down Expand Up @@ -115,7 +116,7 @@ using TxnIndexKeyInfo = BasicKeyInfo<3 , std::tuple<std::string, int64_t>>
using TxnRunningKeyInfo = BasicKeyInfo<5 , std::tuple<std::string, int64_t, int64_t>>;

// 0:instance_id 1:db_id 2:tbl_id 3:partition_id
using VersionKeyInfo = BasicKeyInfo<6 , std::tuple<std::string, int64_t, int64_t, int64_t>>;
using PartitionVersionKeyInfo = BasicKeyInfo<6 , std::tuple<std::string, int64_t, int64_t, int64_t>>;

// 0:instance_id 1:tablet_id 2:version
using MetaRowsetKeyInfo = BasicKeyInfo<7 , std::tuple<std::string, int64_t, int64_t>>;
Expand Down Expand Up @@ -181,6 +182,9 @@ using RLJobProgressKeyInfo = BasicKeyInfo<25, std::tuple<std::string, int64_t, i
// 0:instance_id 1:vault_id
using StorageVaultKeyInfo = BasicKeyInfo<26, std::tuple<std::string, std::string>>;

// 0:instance_id 1:db_id 2:table_id
using TableVersionKeyInfo = BasicKeyInfo<27, std::tuple<std::string, int64_t, int64_t>>;

void instance_key(const InstanceKeyInfo& in, std::string* out);
static inline std::string instance_key(const InstanceKeyInfo& in) { std::string s; instance_key(in, &s); return s; }

Expand All @@ -197,8 +201,11 @@ static inline std::string txn_info_key(const TxnInfoKeyInfo& in) { std::string s
static inline std::string txn_index_key(const TxnIndexKeyInfo& in) { std::string s; txn_index_key(in, &s); return s; }
static inline std::string txn_running_key(const TxnRunningKeyInfo& in) { std::string s; txn_running_key(in, &s); return s; }

void version_key(const VersionKeyInfo& in, std::string* out);
static inline std::string version_key(const VersionKeyInfo& in) { std::string s; version_key(in, &s); return s; }
std::string version_key_prefix(std::string_view instance_id);
void partition_version_key(const PartitionVersionKeyInfo& in, std::string* out);
static inline std::string partition_version_key(const PartitionVersionKeyInfo& in) { std::string s; partition_version_key(in, &s); return s; }
void table_version_key(const TableVersionKeyInfo& in, std::string* out);
static inline std::string table_version_key(const TableVersionKeyInfo& in) { std::string s; table_version_key(in, &s); return s; }

std::string meta_key_prefix(std::string_view instance_id);
void meta_rowset_key(const MetaRowsetKeyInfo& in, std::string* out);
Expand Down
70 changes: 50 additions & 20 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,19 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller,
cloud_unique_id = request->cloud_unique_id();
}

bool is_table_version = false;
if (request->has_is_table_version()) {
is_table_version = request->is_table_version();
}

int64_t db_id = request->has_db_id() ? request->db_id() : -1;
int64_t table_id = request->has_table_id() ? request->table_id() : -1;
int64_t partition_id = request->has_partition_id() ? request->partition_id() : -1;
if (db_id == -1 || table_id == -1 || partition_id == -1) {
if (db_id == -1 || table_id == -1 || (!is_table_version && partition_id == -1)) {
msg = "params error, db_id=" + std::to_string(db_id) +
" table_id=" + std::to_string(table_id) +
" partition_id=" + std::to_string(partition_id);
" partition_id=" + std::to_string(partition_id) +
" is_table_version=" + std::to_string(is_table_version);
code = MetaServiceCode::INVALID_ARGUMENT;
LOG(WARNING) << msg;
return;
Expand All @@ -224,9 +230,12 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller,
return;
}
RPC_RATE_LIMIT(get_version)
VersionKeyInfo ver_key_info {instance_id, db_id, table_id, partition_id};
std::string ver_key;
version_key(ver_key_info, &ver_key);
if (is_table_version) {
table_version_key({instance_id, db_id, table_id}, &ver_key);
} else {
partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key);
}

std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
Expand All @@ -237,17 +246,22 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller,
}

std::string ver_val;
VersionPB version_pb;
// 0 for success get a key, 1 for key not found, negative for error
err = txn->get(ver_key, &ver_val);
VLOG_DEBUG << "xxx get version_key=" << hex(ver_key);
if (err == TxnErrorCode::TXN_OK) {
if (!version_pb.ParseFromString(ver_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed version value";
return;
if (is_table_version) {
int64_t version = *reinterpret_cast<const int64_t*>(ver_val.data());
response->set_version(version);
} else {
VersionPB version_pb;
if (!version_pb.ParseFromString(ver_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed version value";
return;
}
response->set_version(version_pb.version());
}
response->set_version(version_pb.version());
{ TEST_SYNC_POINT_CALLBACK("get_version_code", &code); }
return;
} else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
Expand All @@ -270,12 +284,18 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
cloud_unique_id = request->cloud_unique_id();
}

bool is_table_version = false;
if (request->has_is_table_version()) {
is_table_version = request->is_table_version();
}

if (request->db_ids_size() == 0 || request->table_ids_size() == 0 ||
request->table_ids_size() != request->partition_ids_size() ||
request->db_ids_size() != request->partition_ids_size()) {
(!is_table_version && request->table_ids_size() != request->partition_ids_size()) ||
(!is_table_version && request->db_ids_size() != request->partition_ids_size())) {
msg = "param error, num db_ids=" + std::to_string(request->db_ids_size()) +
" num table_ids=" + std::to_string(request->table_ids_size()) +
" num partition_ids=" + std::to_string(request->partition_ids_size());
" num partition_ids=" + std::to_string(request->partition_ids_size()) +
" is_table_version=" + std::to_string(request->is_table_version());
code = MetaServiceCode::INVALID_ARGUMENT;
LOG(WARNING) << msg;
return;
Expand Down Expand Up @@ -308,21 +328,31 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
int64_t db_id = request->db_ids(i);
int64_t table_id = request->table_ids(i);
int64_t partition_id = request->partition_ids(i);
std::string ver_key = version_key({instance_id, db_id, table_id, partition_id});
std::string ver_key;
if (is_table_version) {
table_version_key({instance_id, db_id, table_id}, &ver_key);
} else {
partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key);
}

// TODO(walter) support batch get.
std::string ver_val;
err = txn->get(ver_key, &ver_val, true);
TEST_SYNC_POINT_CALLBACK("batch_get_version_err", &err);
VLOG_DEBUG << "xxx get version_key=" << hex(ver_key);
if (err == TxnErrorCode::TXN_OK) {
VersionPB version_pb;
if (!version_pb.ParseFromString(ver_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed version value";
break;
if (is_table_version) {
int64_t version = *reinterpret_cast<const int64_t*>(ver_val.data());
response->add_versions(version);
} else {
VersionPB version_pb;
if (!version_pb.ParseFromString(ver_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed version value";
break;
}
response->add_versions(version_pb.version());
}
response->add_versions(version_pb.version());
} else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
// return -1 if the target version is not exists.
response->add_versions(-1);
Expand Down
29 changes: 27 additions & 2 deletions cloud/src/meta-service/meta_service_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,16 @@ void MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
LOG_INFO("remove recycle index").tag("key", hex(key));
txn->remove(key);
}

if (request->has_db_id() && request->has_is_new_table() && request->is_new_table()) {
// init table version, for create and truncate table
std::string key = table_version_key({instance_id, request->db_id(), request->table_id()});
std::string val(sizeof(int64_t), 0);
*reinterpret_cast<int64_t*>(val.data()) = (int64_t)1;
txn->put(key, val);
LOG_INFO("put table version").tag("key", hex(key));
}

err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
Expand Down Expand Up @@ -493,6 +503,14 @@ void MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro
LOG_INFO("remove recycle partition").tag("key", hex(key));
txn->remove(key);
}

// update table versions
if (request->has_db_id()) {
std::string ver_key = table_version_key({instance_id, request->db_id(), request->table_id()});
txn->atomic_add(ver_key, 1);
LOG_INFO("update table version").tag("ver_key", hex(ver_key));
}

err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
Expand All @@ -513,8 +531,7 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
}
RPC_RATE_LIMIT(drop_partition)

if (request->partition_ids().empty() || request->index_ids().empty() ||
!request->has_table_id()) {
if (request->partition_ids().empty() || request->index_ids().empty() || !request->has_table_id()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty partition_ids or index_ids or table_id";
return;
Expand Down Expand Up @@ -579,6 +596,14 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll
}
}
if (!need_commit) return;

// update table versions
if (request->has_db_id()) {
std::string ver_key = table_version_key({instance_id, request->db_id(), request->table_id()});
txn->atomic_add(ver_key, 1);
LOG_INFO("update table version").tag("ver_key", hex(ver_key));
}

err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
Expand Down
15 changes: 12 additions & 3 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
int64_t table_id = tablet_ids[tablet_id].table_id();
int64_t partition_id = i.partition_id();

std::string ver_key = version_key({instance_id, db_id, table_id, partition_id});
std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id});
int64_t version = -1;
std::string ver_val_str;
int64_t new_version = -1;
Expand Down Expand Up @@ -983,11 +983,11 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,

txn->put(i.first, ver_val);
put_size += i.first.size() + ver_val.size();
LOG(INFO) << "xxx put version_key=" << hex(i.first) << " version:" << i.second
LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " version:" << i.second
<< " txn_id=" << txn_id;

std::string_view ver_key = i.first;
//VersionKeyInfo {instance_id, db_id, table_id, partition_id}
//PartitionVersionKeyInfo {instance_id, db_id, table_id, partition_id}
ver_key.remove_prefix(1); // Remove key space
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
int ret = decode_key(&ver_key, &out);
Expand All @@ -1009,6 +1009,15 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
response->add_versions(i.second);
}

// Save table versions
num_put_keys += table_id_tablet_ids.size();
for (auto& i : table_id_tablet_ids) {
std::string ver_key = table_version_key({instance_id, db_id, i.first});
txn->atomic_add(ver_key, 1);
put_size += ver_key.size();
LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) << " txn_id=" << txn_id;
}

LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString();

// Update txn_info
Expand Down
Loading