diff --git a/cloud/src/meta-service/http_encode_key.cpp b/cloud/src/meta-service/http_encode_key.cpp index aa0c3c4dac95fc..f70f2accb500bd 100644 --- a/cloud/src/meta-service/http_encode_key.cpp +++ b/cloud/src/meta-service/http_encode_key.cpp @@ -140,7 +140,8 @@ static std::unordered_map{p}.get()); } , parse}} , {"TxnIndexKey", {{"instance_id", "txn_id"}, [](param_type& p) { return txn_index_key(KeyInfoSetter{p}.get()); } , parse}} , {"TxnRunningKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return txn_running_key(KeyInfoSetter{p}.get()); } , parse}} , - {"VersionKey", {{"instance_id", "db_id", "tbl_id", "partition_id"}, [](param_type& p) { return version_key(KeyInfoSetter{p}.get()); } , parse}} , + {"PartitionVersionKey", {{"instance_id", "db_id", "tbl_id", "partition_id"}, [](param_type& p) { return partition_version_key(KeyInfoSetter{p}.get()); } , parse}} , + {"TableVersionKey", {{"instance_id", "db_id", "tbl_id"}, [](param_type& p) { return table_version_key(KeyInfoSetter{p}.get()); } , parse}} , {"MetaRowsetKey", {{"instance_id", "tablet_id", "version"}, [](param_type& p) { return meta_rowset_key(KeyInfoSetter{p}.get()); } , parse}} , {"MetaRowsetTmpKey", {{"instance_id", "txn_id", "tablet_id"}, [](param_type& p) { return meta_rowset_tmp_key(KeyInfoSetter{p}.get()); } , parse}} , {"MetaTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return meta_tablet_key(KeyInfoSetter{p}.get()); } , parse}} , diff --git a/cloud/src/meta-service/keys.cpp b/cloud/src/meta-service/keys.cpp index f3e89dfbc66d1f..6e883681696a0d 100644 --- a/cloud/src/meta-service/keys.cpp +++ b/cloud/src/meta-service/keys.cpp @@ -43,7 +43,8 @@ namespace doris::cloud { [[maybe_unused]] static const char* TXN_KEY_INFIX_INDEX = "txn_index"; [[maybe_unused]] static const char* TXN_KEY_INFIX_RUNNING = "txn_running"; -[[maybe_unused]] static const char* VERSION_KEY_INFIX = "partition"; +[[maybe_unused]] static const char* PARTITION_VERSION_KEY_INFIX = "partition"; +[[maybe_unused]] static const char* TABLE_VERSION_KEY_INFIX = "table"; [[maybe_unused]] static const char* META_KEY_INFIX_ROWSET = "rowset"; [[maybe_unused]] static const char* META_KEY_INFIX_ROWSET_TMP = "rowset_tmp"; @@ -114,9 +115,9 @@ static void encode_prefix(const T& t, std::string* key) { InstanceKeyInfo, TxnLabelKeyInfo, TxnInfoKeyInfo, TxnIndexKeyInfo, TxnRunningKeyInfo, MetaRowsetKeyInfo, MetaRowsetTmpKeyInfo, MetaTabletKeyInfo, MetaTabletIdxKeyInfo, MetaSchemaKeyInfo, - MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo, MetaPendingDeleteBitmapInfo, VersionKeyInfo, + MetaDeleteBitmapInfo, MetaDeleteBitmapUpdateLockInfo, MetaPendingDeleteBitmapInfo, PartitionVersionKeyInfo, RecycleIndexKeyInfo, RecyclePartKeyInfo, RecycleRowsetKeyInfo, RecycleTxnKeyInfo, RecycleStageKeyInfo, - StatsTabletKeyInfo, + StatsTabletKeyInfo, TableVersionKeyInfo, JobTabletKeyInfo, JobRecycleKeyInfo, RLJobProgressKeyInfo, CopyJobKeyInfo, CopyFileKeyInfo, MetaRowsetSchemaKeyInfo, StorageVaultKeyInfo>); @@ -139,7 +140,8 @@ static void encode_prefix(const T& t, std::string* key) { || std::is_same_v || std::is_same_v) { encode_bytes(META_KEY_PREFIX, key); - } else if constexpr (std::is_same_v) { + } else if constexpr (std::is_same_v + || std::is_same_v) { encode_bytes(VERSION_KEY_PREFIX, key); } else if constexpr (std::is_same_v || std::is_same_v @@ -217,9 +219,22 @@ void txn_running_key(const TxnRunningKeyInfo& in, std::string* out) { // Version keys //============================================================================== -void version_key(const VersionKeyInfo& in, std::string* out) { +std::string version_key_prefix(std::string_view instance_id) { + std::string out; + encode_prefix(TableVersionKeyInfo {instance_id, 0, 0}, &out); + return out; +} + +void table_version_key(const TableVersionKeyInfo& in, std::string* out) { + encode_prefix(in, out); // 0x01 "version" ${instance_id} + encode_bytes(TABLE_VERSION_KEY_INFIX, out); // "table" + encode_int64(std::get<1>(in), out); // db_id + encode_int64(std::get<2>(in), out); // tbl_id +} + +void partition_version_key(const PartitionVersionKeyInfo& in, std::string* out) { encode_prefix(in, out); // 0x01 "version" ${instance_id} - encode_bytes(VERSION_KEY_INFIX, out); // "partition" + encode_bytes(PARTITION_VERSION_KEY_INFIX, out); // "partition" encode_int64(std::get<1>(in), out); // db_id encode_int64(std::get<2>(in), out); // tbl_id encode_int64(std::get<3>(in), out); // partition_id diff --git a/cloud/src/meta-service/keys.h b/cloud/src/meta-service/keys.h index edc9afcf4e8a78..483332a133c0b1 100644 --- a/cloud/src/meta-service/keys.h +++ b/cloud/src/meta-service/keys.h @@ -34,6 +34,7 @@ // 0x01 "txn" ${instance_id} "txn_running" ${db_id} ${txn_id} -> TxnRunningPB // // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id} ${partition_id} -> VersionPB +// 0x01 "version" ${instance_id} "table" ${db_id} ${tbl_id} -> int64 // // 0x01 "meta" ${instance_id} "rowset" ${tablet_id} ${version} -> RowsetMetaCloudPB // 0x01 "meta" ${instance_id} "rowset_tmp" ${txn_id} ${tablet_id} -> RowsetMetaCloudPB @@ -115,7 +116,7 @@ using TxnIndexKeyInfo = BasicKeyInfo<3 , std::tuple> using TxnRunningKeyInfo = BasicKeyInfo<5 , std::tuple>; // 0:instance_id 1:db_id 2:tbl_id 3:partition_id -using VersionKeyInfo = BasicKeyInfo<6 , std::tuple>; +using PartitionVersionKeyInfo = BasicKeyInfo<6 , std::tuple>; // 0:instance_id 1:tablet_id 2:version using MetaRowsetKeyInfo = BasicKeyInfo<7 , std::tuple>; @@ -181,6 +182,9 @@ using RLJobProgressKeyInfo = BasicKeyInfo<25, std::tuple>; +// 0:instance_id 1:db_id 2:table_id +using TableVersionKeyInfo = BasicKeyInfo<27, std::tuple>; + void instance_key(const InstanceKeyInfo& in, std::string* out); static inline std::string instance_key(const InstanceKeyInfo& in) { std::string s; instance_key(in, &s); return s; } @@ -197,8 +201,11 @@ static inline std::string txn_info_key(const TxnInfoKeyInfo& in) { std::string s static inline std::string txn_index_key(const TxnIndexKeyInfo& in) { std::string s; txn_index_key(in, &s); return s; } static inline std::string txn_running_key(const TxnRunningKeyInfo& in) { std::string s; txn_running_key(in, &s); return s; } -void version_key(const VersionKeyInfo& in, std::string* out); -static inline std::string version_key(const VersionKeyInfo& in) { std::string s; version_key(in, &s); return s; } +std::string version_key_prefix(std::string_view instance_id); +void partition_version_key(const PartitionVersionKeyInfo& in, std::string* out); +static inline std::string partition_version_key(const PartitionVersionKeyInfo& in) { std::string s; partition_version_key(in, &s); return s; } +void table_version_key(const TableVersionKeyInfo& in, std::string* out); +static inline std::string table_version_key(const TableVersionKeyInfo& in) { std::string s; table_version_key(in, &s); return s; } std::string meta_key_prefix(std::string_view instance_id); void meta_rowset_key(const MetaRowsetKeyInfo& in, std::string* out); diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 5f2a0d39066673..d9c9048793cb71 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -204,13 +204,19 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller, cloud_unique_id = request->cloud_unique_id(); } + bool is_table_version = false; + if (request->has_is_table_version()) { + is_table_version = request->is_table_version(); + } + int64_t db_id = request->has_db_id() ? request->db_id() : -1; int64_t table_id = request->has_table_id() ? request->table_id() : -1; int64_t partition_id = request->has_partition_id() ? request->partition_id() : -1; - if (db_id == -1 || table_id == -1 || partition_id == -1) { + if (db_id == -1 || table_id == -1 || (!is_table_version && partition_id == -1)) { msg = "params error, db_id=" + std::to_string(db_id) + " table_id=" + std::to_string(table_id) + - " partition_id=" + std::to_string(partition_id); + " partition_id=" + std::to_string(partition_id) + + " is_table_version=" + std::to_string(is_table_version); code = MetaServiceCode::INVALID_ARGUMENT; LOG(WARNING) << msg; return; @@ -224,9 +230,12 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller, return; } RPC_RATE_LIMIT(get_version) - VersionKeyInfo ver_key_info {instance_id, db_id, table_id, partition_id}; std::string ver_key; - version_key(ver_key_info, &ver_key); + if (is_table_version) { + table_version_key({instance_id, db_id, table_id}, &ver_key); + } else { + partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key); + } std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); @@ -237,17 +246,22 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller, } std::string ver_val; - VersionPB version_pb; // 0 for success get a key, 1 for key not found, negative for error err = txn->get(ver_key, &ver_val); VLOG_DEBUG << "xxx get version_key=" << hex(ver_key); if (err == TxnErrorCode::TXN_OK) { - if (!version_pb.ParseFromString(ver_val)) { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "malformed version value"; - return; + if (is_table_version) { + int64_t version = *reinterpret_cast(ver_val.data()); + response->set_version(version); + } else { + VersionPB version_pb; + if (!version_pb.ParseFromString(ver_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "malformed version value"; + return; + } + response->set_version(version_pb.version()); } - response->set_version(version_pb.version()); { TEST_SYNC_POINT_CALLBACK("get_version_code", &code); } return; } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { @@ -270,12 +284,18 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr cloud_unique_id = request->cloud_unique_id(); } + bool is_table_version = false; + if (request->has_is_table_version()) { + is_table_version = request->is_table_version(); + } + if (request->db_ids_size() == 0 || request->table_ids_size() == 0 || - request->table_ids_size() != request->partition_ids_size() || - request->db_ids_size() != request->partition_ids_size()) { + (!is_table_version && request->table_ids_size() != request->partition_ids_size()) || + (!is_table_version && request->db_ids_size() != request->partition_ids_size())) { msg = "param error, num db_ids=" + std::to_string(request->db_ids_size()) + " num table_ids=" + std::to_string(request->table_ids_size()) + - " num partition_ids=" + std::to_string(request->partition_ids_size()); + " num partition_ids=" + std::to_string(request->partition_ids_size()) + + " is_table_version=" + std::to_string(request->is_table_version()); code = MetaServiceCode::INVALID_ARGUMENT; LOG(WARNING) << msg; return; @@ -308,7 +328,12 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr int64_t db_id = request->db_ids(i); int64_t table_id = request->table_ids(i); int64_t partition_id = request->partition_ids(i); - std::string ver_key = version_key({instance_id, db_id, table_id, partition_id}); + std::string ver_key; + if (is_table_version) { + table_version_key({instance_id, db_id, table_id}, &ver_key); + } else { + partition_version_key({instance_id, db_id, table_id, partition_id}, &ver_key); + } // TODO(walter) support batch get. std::string ver_val; @@ -316,13 +341,18 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr TEST_SYNC_POINT_CALLBACK("batch_get_version_err", &err); VLOG_DEBUG << "xxx get version_key=" << hex(ver_key); if (err == TxnErrorCode::TXN_OK) { - VersionPB version_pb; - if (!version_pb.ParseFromString(ver_val)) { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - msg = "malformed version value"; - break; + if (is_table_version) { + int64_t version = *reinterpret_cast(ver_val.data()); + response->add_versions(version); + } else { + VersionPB version_pb; + if (!version_pb.ParseFromString(ver_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "malformed version value"; + break; + } + response->add_versions(version_pb.version()); } - response->add_versions(version_pb.version()); } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { // return -1 if the target version is not exists. response->add_versions(-1); diff --git a/cloud/src/meta-service/meta_service_partition.cpp b/cloud/src/meta-service/meta_service_partition.cpp index 17c67fb22ffd0c..c5a7db7d9028b1 100644 --- a/cloud/src/meta-service/meta_service_partition.cpp +++ b/cloud/src/meta-service/meta_service_partition.cpp @@ -222,6 +222,16 @@ void MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller LOG_INFO("remove recycle index").tag("key", hex(key)); txn->remove(key); } + + if (request->has_db_id() && request->has_is_new_table() && request->is_new_table()) { + // init table version, for create and truncate table + std::string key = table_version_key({instance_id, request->db_id(), request->table_id()}); + std::string val(sizeof(int64_t), 0); + *reinterpret_cast(val.data()) = (int64_t)1; + txn->put(key, val); + LOG_INFO("put table version").tag("key", hex(key)); + } + err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -493,6 +503,14 @@ void MetaServiceImpl::commit_partition(::google::protobuf::RpcController* contro LOG_INFO("remove recycle partition").tag("key", hex(key)); txn->remove(key); } + + // update table versions + if (request->has_db_id()) { + std::string ver_key = table_version_key({instance_id, request->db_id(), request->table_id()}); + txn->atomic_add(ver_key, 1); + LOG_INFO("update table version").tag("ver_key", hex(ver_key)); + } + err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); @@ -513,8 +531,7 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll } RPC_RATE_LIMIT(drop_partition) - if (request->partition_ids().empty() || request->index_ids().empty() || - !request->has_table_id()) { + if (request->partition_ids().empty() || request->index_ids().empty() || !request->has_table_id()) { code = MetaServiceCode::INVALID_ARGUMENT; msg = "empty partition_ids or index_ids or table_id"; return; @@ -579,6 +596,14 @@ void MetaServiceImpl::drop_partition(::google::protobuf::RpcController* controll } } if (!need_commit) return; + + // update table versions + if (request->has_db_id()) { + std::string ver_key = table_version_key({instance_id, request->db_id(), request->table_id()}); + txn->atomic_add(ver_key, 1); + LOG_INFO("update table version").tag("ver_key", hex(ver_key)); + } + err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { code = cast_as(err); diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 03251ee0f071a7..0c7f57385b3d7f 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -864,7 +864,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, int64_t table_id = tablet_ids[tablet_id].table_id(); int64_t partition_id = i.partition_id(); - std::string ver_key = version_key({instance_id, db_id, table_id, partition_id}); + std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id}); int64_t version = -1; std::string ver_val_str; int64_t new_version = -1; @@ -983,11 +983,11 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, txn->put(i.first, ver_val); put_size += i.first.size() + ver_val.size(); - LOG(INFO) << "xxx put version_key=" << hex(i.first) << " version:" << i.second + LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " version:" << i.second << " txn_id=" << txn_id; std::string_view ver_key = i.first; - //VersionKeyInfo {instance_id, db_id, table_id, partition_id} + //PartitionVersionKeyInfo {instance_id, db_id, table_id, partition_id} ver_key.remove_prefix(1); // Remove key space std::vector, int, int>> out; int ret = decode_key(&ver_key, &out); @@ -1009,6 +1009,15 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, response->add_versions(i.second); } + // Save table versions + num_put_keys += table_id_tablet_ids.size(); + for (auto& i : table_id_tablet_ids) { + std::string ver_key = table_version_key({instance_id, db_id, i.first}); + txn->atomic_add(ver_key, 1); + put_size += ver_key.size(); + LOG(INFO) << "xxx atomic add table_version_key=" << hex(ver_key) << " txn_id=" << txn_id; + } + LOG(INFO) << " before update txn_info=" << txn_info.ShortDebugString(); // Update txn_info diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 1ca1e05f741e57..cafcd2131d879e 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -527,9 +527,8 @@ int InstanceRecycler::recycle_deleted_instance() { std::string start_txn_key = txn_key_prefix(instance_id_); std::string end_txn_key = txn_key_prefix(instance_id_ + '\x00'); txn->remove(start_txn_key, end_txn_key); - // 0:instance_id 1:db_id 2:tbl_id 3:partition_id - std::string start_version_key = version_key({instance_id_, 0, 0, 0}); - std::string end_version_key = version_key({instance_id_, INT64_MAX, 0, 0}); + std::string start_version_key = version_key_prefix(instance_id_); + std::string end_version_key = version_key_prefix(instance_id_ + '\x00'); txn->remove(start_version_key, end_version_key); std::string start_meta_key = meta_key_prefix(instance_id_); std::string end_meta_key = meta_key_prefix(instance_id_ + '\x00'); @@ -758,7 +757,7 @@ int InstanceRecycler::recycle_partitions() { // Elements in `partition_keys` has the same lifetime as `it` in `scan_and_recycle` std::vector partition_keys; - std::vector version_keys; + std::vector partition_version_keys; auto recycle_func = [&, this](std::string_view k, std::string_view v) -> int { ++num_scanned; RecyclePartitionPB part_pb; @@ -831,18 +830,18 @@ int InstanceRecycler::recycle_partitions() { check_recycle_task(instance_id_, task_name, num_scanned, num_recycled, start_time); partition_keys.push_back(k); if (part_pb.db_id() > 0) { - version_keys.push_back(version_key( + partition_version_keys.push_back(partition_version_key( {instance_id_, part_pb.db_id(), part_pb.table_id(), partition_id})); } } return ret; }; - auto loop_done = [&partition_keys, &version_keys, this]() -> int { + auto loop_done = [&partition_keys, &partition_version_keys, this]() -> int { if (partition_keys.empty()) return 0; std::unique_ptr> defer((int*)0x01, [&](int*) { partition_keys.clear(); - version_keys.clear(); + partition_version_keys.clear(); }); std::unique_ptr txn; TxnErrorCode err = txn_kv_->create_txn(&txn); @@ -853,7 +852,7 @@ int InstanceRecycler::recycle_partitions() { for (auto& k : partition_keys) { txn->remove(k); } - for (auto& k : version_keys) { + for (auto& k : partition_version_keys) { txn->remove(k); } err = txn->commit(); @@ -872,24 +871,24 @@ int InstanceRecycler::recycle_versions() { int num_scanned = 0; int num_recycled = 0; - LOG_INFO("begin to recycle partition versions").tag("instance_id", instance_id_); + LOG_INFO("begin to recycle table and partition versions").tag("instance_id", instance_id_); auto start_time = steady_clock::now(); std::unique_ptr> defer_log_statistics((int*)0x01, [&](int*) { auto cost = duration(steady_clock::now() - start_time).count(); - LOG_INFO("recycle partition versions finished, cost={}s", cost) + LOG_INFO("recycle table and partition versions finished, cost={}s", cost) .tag("instance_id", instance_id_) .tag("num_scanned", num_scanned) .tag("num_recycled", num_recycled); }); - auto version_key_begin = version_key({instance_id_, 0, 0, 0}); - auto version_key_end = version_key({instance_id_, INT64_MAX, 0, 0}); + auto version_key_begin = partition_version_key({instance_id_, 0, 0, 0}); + auto version_key_end = partition_version_key({instance_id_, INT64_MAX, 0, 0}); int64_t last_scanned_table_id = 0; bool is_recycled = false; // Is last scanned kv recycled auto recycle_func = [&num_scanned, &num_recycled, &last_scanned_table_id, &is_recycled, this]( - std::string_view k, std::string_view) { + std::string_view k, std::string_view) { ++num_scanned; auto k1 = k; k1.remove_prefix(1); @@ -916,16 +915,20 @@ int InstanceRecycler::recycle_versions() { if (err != TxnErrorCode::TXN_OK) { return -1; } - if (iter->has_next()) { // Table is useful, should not recycle partiton versions + if (iter->has_next()) { // Table is useful, should not recycle table and partition versions return 0; } - // Remove all version kvs of this table auto db_id = std::get(std::get<0>(out[3])); - auto table_version_key_begin = version_key({instance_id_, db_id, table_id, 0}); - auto table_version_key_end = version_key({instance_id_, db_id, table_id, INT64_MAX}); - txn->remove(table_version_key_begin, table_version_key_end); - LOG(WARNING) << "remove version kv, begin=" << hex(table_version_key_begin) - << " end=" << hex(table_version_key_end); + // 1. Remove all partition version kvs of this table + auto partition_version_key_begin = partition_version_key({instance_id_, db_id, table_id, 0}); + auto partition_version_key_end = partition_version_key({instance_id_, db_id, table_id, INT64_MAX}); + txn->remove(partition_version_key_begin, partition_version_key_end); + LOG(WARNING) << "remove partition version kv, begin=" << hex(partition_version_key_begin) + << " end=" << hex(partition_version_key_end); + // 2. Remove the table version kv of this table + auto tbl_version_key = table_version_key({instance_id_, db_id, table_id}); + txn->remove(tbl_version_key); + LOG(WARNING) << "remove table version kv " << hex(tbl_version_key); err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { return -1; diff --git a/cloud/test/http_encode_key_test.cpp b/cloud/test/http_encode_key_test.cpp index ac5eade9735859..25182d54489f71 100644 --- a/cloud/test/http_encode_key_test.cpp +++ b/cloud/test/http_encode_key_test.cpp @@ -168,7 +168,7 @@ txn_id=126419752960)", R"({"table_ids":["10001"]})", }, Input { - "VersionKey", + "PartitionVersionKey", "instance_id=gavin-instance&db_id=10086&tbl_id=10010&partition_id=10000", "011076657273696f6e000110676176696e2d696e7374616e6365000110706172746974696f6e000112000000000000276612000000000000271a120000000000002710", []() -> std::string { @@ -178,6 +178,17 @@ txn_id=126419752960)", }, R"({"version":"10"})", }, + Input { + "TableVersionKey", + "instance_id=gavin-instance&db_id=10086&tbl_id=10010", + "011076657273696f6e000110676176696e2d696e7374616e63650001107461626c65000112000000000000276612000000000000271a", + []() -> std::string { + VersionPB pb; + pb.set_version(10); + return pb.SerializeAsString(); + }, + R"({"version":"10"})", + }, Input { "MetaRowsetKey", "instance_id=gavin-instance&tablet_id=10086&version=10010", diff --git a/cloud/test/keys_test.cpp b/cloud/test/keys_test.cpp index 614a17374d0bf5..ce7cbb0551e1bd 100644 --- a/cloud/test/keys_test.cpp +++ b/cloud/test/keys_test.cpp @@ -298,14 +298,14 @@ TEST(KeysTest, VersionKeyTest) { using namespace doris::cloud; std::string instance_id = "instance_id_deadbeef"; - // 0x01 "version" ${instance_id} "version_id" ${db_id} ${tbl_id} ${partition_id} -> ${version} + // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id} ${partition_id} -> ${version} { int64_t db_id = 11111; - int64_t tablet_id = 10086; + int64_t table_id = 10086; int64_t partition_id = 9998; - VersionKeyInfo v_key {instance_id, db_id, tablet_id, partition_id}; + PartitionVersionKeyInfo v_key {instance_id, db_id, table_id, partition_id}; std::string encoded_version_key0; - version_key(v_key, &encoded_version_key0); + partition_version_key(v_key, &encoded_version_key0); std::cout << "version key after encode: " << hex(encoded_version_key0) << std::endl; std::string dec_instance_id; @@ -329,12 +329,50 @@ TEST(KeysTest, VersionKeyTest) { EXPECT_EQ("partition", dec_version_infix); EXPECT_EQ(instance_id, dec_instance_id); EXPECT_EQ(db_id, dec_db_id); - EXPECT_EQ(tablet_id, dec_table_id); + EXPECT_EQ(table_id, dec_table_id); EXPECT_EQ(partition_id, dec_partition_id); std::get<3>(v_key) = partition_id + 1; std::string encoded_version_key1; - version_key(v_key, &encoded_version_key1); + partition_version_key(v_key, &encoded_version_key1); + std::cout << "version key after encode: " << hex(encoded_version_key1) << std::endl; + + ASSERT_GT(encoded_version_key1, encoded_version_key0); + } + + // 0x01 "version" ${instance_id} "table" ${db_id} ${tbl_id} -> ${version} + { + int64_t db_id = 11111; + int64_t table_id = 10010; + TableVersionKeyInfo v_key {instance_id, db_id, table_id}; + std::string encoded_version_key0; + table_version_key(v_key, &encoded_version_key0); + std::cout << "version key after encode: " << hex(encoded_version_key0) << std::endl; + + std::string dec_instance_id; + int64_t dec_db_id = 0; + int64_t dec_table_id = 0; + + std::string_view key_sv(encoded_version_key0); + std::string dec_version_prefix; + std::string dec_version_infix; + remove_user_space_prefix(&key_sv); + ASSERT_EQ(decode_bytes(&key_sv, &dec_version_prefix), 0); + ASSERT_EQ(decode_bytes(&key_sv, &dec_instance_id), 0); + ASSERT_EQ(decode_bytes(&key_sv, &dec_version_infix), 0); + ASSERT_EQ(decode_int64(&key_sv, &dec_db_id), 0) << hex(key_sv); + ASSERT_EQ(decode_int64(&key_sv, &dec_table_id), 0); + ASSERT_TRUE(key_sv.empty()); + + EXPECT_EQ("version", dec_version_prefix); + EXPECT_EQ("table", dec_version_infix); + EXPECT_EQ(instance_id, dec_instance_id); + EXPECT_EQ(db_id, dec_db_id); + EXPECT_EQ(table_id, dec_table_id); + + std::get<2>(v_key) = table_id + 1; + std::string encoded_version_key1; + table_version_key(v_key, &encoded_version_key1); std::cout << "version key after encode: " << hex(encoded_version_key1) << std::endl; ASSERT_GT(encoded_version_key1, encoded_version_key0); diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 2d8e0c89a5c406..a497cb566f4974 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -4247,6 +4247,8 @@ TEST(MetaServiceTest, IndexRequest) { auto tablet_val = tablet_pb.SerializeAsString(); RecycleIndexPB index_pb; auto index_key = recycle_index_key({instance_id, index_id}); + int64_t val_int = 0; + auto tbl_version_key = table_version_key({instance_id, 1, table_id}); std::string val; // ------------Test prepare index------------ @@ -4255,8 +4257,10 @@ TEST(MetaServiceTest, IndexRequest) { IndexResponse res; meta_service->prepare_index(&ctrl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT); + req.set_db_id(1); req.set_table_id(table_id); req.add_index_ids(index_id); + req.set_is_new_table(true); // Last state UNKNOWN res.Clear(); meta_service->prepare_index(&ctrl, &req, &res, nullptr); @@ -4312,19 +4316,26 @@ TEST(MetaServiceTest, IndexRequest) { ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); ASSERT_EQ(txn->get(index_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); + // Prepare index should not init table version + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); // ------------Test commit index------------ reset_meta_service(); req.Clear(); meta_service->commit_index(&ctrl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT); + req.set_db_id(1); req.set_table_id(table_id); req.add_index_ids(index_id); + req.set_is_new_table(true); // Last state UNKNOWN res.Clear(); meta_service->commit_index(&ctrl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); ASSERT_EQ(txn->get(index_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); // Last state PREPARED reset_meta_service(); index_pb.set_state(RecycleIndexPB::PREPARED); @@ -4337,6 +4348,11 @@ TEST(MetaServiceTest, IndexRequest) { ASSERT_EQ(res.status().code(), MetaServiceCode::OK); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); ASSERT_EQ(txn->get(index_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); + // First commit index should init table version + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); + val_int = *reinterpret_cast(val.data()); + ASSERT_EQ(val_int, 1); // Last state DROPPED reset_meta_service(); index_pb.set_state(RecycleIndexPB::DROPPED); @@ -4351,6 +4367,8 @@ TEST(MetaServiceTest, IndexRequest) { ASSERT_EQ(txn->get(index_key, &val), TxnErrorCode::TXN_OK); ASSERT_TRUE(index_pb.ParseFromString(val)); ASSERT_EQ(index_pb.state(), RecycleIndexPB::DROPPED); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); // Last state RECYCLING reset_meta_service(); index_pb.set_state(RecycleIndexPB::RECYCLING); @@ -4365,6 +4383,8 @@ TEST(MetaServiceTest, IndexRequest) { ASSERT_EQ(txn->get(index_key, &val), TxnErrorCode::TXN_OK); ASSERT_TRUE(index_pb.ParseFromString(val)); ASSERT_EQ(index_pb.state(), RecycleIndexPB::RECYCLING); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); // Last state UNKNOWN but tablet meta existed reset_meta_service(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4375,13 +4395,17 @@ TEST(MetaServiceTest, IndexRequest) { ASSERT_EQ(res.status().code(), MetaServiceCode::OK); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); ASSERT_EQ(txn->get(index_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); // ------------Test drop index------------ reset_meta_service(); req.Clear(); meta_service->drop_index(&ctrl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT); + req.set_db_id(1); req.set_table_id(table_id); req.add_index_ids(index_id); + req.set_is_new_table(true); // Last state UNKNOWN res.Clear(); meta_service->drop_index(&ctrl, &req, &res, nullptr); @@ -4432,6 +4456,9 @@ TEST(MetaServiceTest, IndexRequest) { ASSERT_EQ(txn->get(index_key, &val), TxnErrorCode::TXN_OK); ASSERT_TRUE(index_pb.ParseFromString(val)); ASSERT_EQ(index_pb.state(), RecycleIndexPB::RECYCLING); + // Drop index should not init table version + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); } TEST(MetaServiceTest, PartitionRequest) { @@ -4460,6 +4487,8 @@ TEST(MetaServiceTest, PartitionRequest) { auto tablet_val = tablet_pb.SerializeAsString(); RecyclePartitionPB partition_pb; auto partition_key = recycle_partition_key({instance_id, partition_id}); + int64_t val_int = 0; + auto tbl_version_key = table_version_key({instance_id, 1, table_id}); std::string val; // ------------Test prepare partition------------ brpc::Controller ctrl; @@ -4467,17 +4496,26 @@ TEST(MetaServiceTest, PartitionRequest) { PartitionResponse res; meta_service->prepare_partition(&ctrl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT); + req.set_db_id(1); req.set_table_id(table_id); req.add_index_ids(index_id); req.add_partition_ids(partition_id); // Last state UNKNOWN res.Clear(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->atomic_add(tbl_version_key, 1); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); meta_service->prepare_partition(&ctrl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_OK); ASSERT_TRUE(partition_pb.ParseFromString(val)); ASSERT_EQ(partition_pb.state(), RecyclePartitionPB::PREPARED); + // Prepare partition should not update table version + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); + val_int = *reinterpret_cast(val.data()); + ASSERT_EQ(val_int, 1); // Last state PREPARED res.Clear(); meta_service->prepare_partition(&ctrl, &req, &res, nullptr); @@ -4525,11 +4563,14 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); // ------------Test commit partition------------ reset_meta_service(); req.Clear(); meta_service->commit_partition(&ctrl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT); + req.set_db_id(1); req.set_table_id(table_id); req.add_index_ids(index_id); req.add_partition_ids(partition_id); @@ -4541,6 +4582,9 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); // Last state PREPARED reset_meta_service(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->atomic_add(tbl_version_key, 1); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::PREPARED); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4551,8 +4595,16 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(res.status().code(), MetaServiceCode::OK); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); + // Commit partition should update table version + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); + val_int = *reinterpret_cast(val.data()); + ASSERT_EQ(val_int, 2); // Last state DROPPED reset_meta_service(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->atomic_add(tbl_version_key, 1); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::DROPPED); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4565,8 +4617,15 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_OK); ASSERT_TRUE(partition_pb.ParseFromString(val)); ASSERT_EQ(partition_pb.state(), RecyclePartitionPB::DROPPED); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); + val_int = *reinterpret_cast(val.data()); + ASSERT_EQ(val_int, 1); // Last state RECYCLING reset_meta_service(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->atomic_add(tbl_version_key, 1); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::RECYCLING); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4579,9 +4638,16 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_OK); ASSERT_TRUE(partition_pb.ParseFromString(val)); ASSERT_EQ(partition_pb.state(), RecyclePartitionPB::RECYCLING); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); + val_int = *reinterpret_cast(val.data()); + ASSERT_EQ(val_int, 1); // Last state UNKNOWN but tablet meta existed reset_meta_service(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->atomic_add(tbl_version_key, 1); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); txn->put(tablet_key, tablet_val); ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); res.Clear(); @@ -4589,6 +4655,10 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(res.status().code(), MetaServiceCode::OK); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_KEY_NOT_FOUND); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); + val_int = *reinterpret_cast(val.data()); + ASSERT_EQ(val_int, 1); // Last state UNKNOWN and tablet meta existed, but request has no index ids reset_meta_service(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4606,19 +4676,31 @@ TEST(MetaServiceTest, PartitionRequest) { req.Clear(); meta_service->drop_partition(&ctrl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT); + req.set_db_id(1); req.set_table_id(table_id); req.add_index_ids(index_id); req.add_partition_ids(partition_id); // Last state UNKNOWN res.Clear(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->atomic_add(tbl_version_key, 1); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); meta_service->drop_partition(&ctrl, &req, &res, nullptr); ASSERT_EQ(res.status().code(), MetaServiceCode::OK); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_OK); ASSERT_TRUE(partition_pb.ParseFromString(val)); ASSERT_EQ(partition_pb.state(), RecyclePartitionPB::DROPPED); + // Drop partition should update table version + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); + val_int = *reinterpret_cast(val.data()); + ASSERT_EQ(val_int, 2); // Last state PREPARED reset_meta_service(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->atomic_add(tbl_version_key, 1); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::PREPARED); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4631,8 +4713,15 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_OK); ASSERT_TRUE(partition_pb.ParseFromString(val)); ASSERT_EQ(partition_pb.state(), RecyclePartitionPB::DROPPED); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); + val_int = *reinterpret_cast(val.data()); + ASSERT_EQ(val_int, 2); // Last state DROPPED reset_meta_service(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->atomic_add(tbl_version_key, 1); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::DROPPED); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4645,8 +4734,15 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_OK); ASSERT_TRUE(partition_pb.ParseFromString(val)); ASSERT_EQ(partition_pb.state(), RecyclePartitionPB::DROPPED); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); + val_int = *reinterpret_cast(val.data()); + ASSERT_EQ(val_int, 1); // Last state RECYCLING reset_meta_service(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->atomic_add(tbl_version_key, 1); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); partition_pb.set_state(RecyclePartitionPB::RECYCLING); val = partition_pb.SerializeAsString(); ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); @@ -4659,6 +4755,10 @@ TEST(MetaServiceTest, PartitionRequest) { ASSERT_EQ(txn->get(partition_key, &val), TxnErrorCode::TXN_OK); ASSERT_TRUE(partition_pb.ParseFromString(val)); ASSERT_EQ(partition_pb.state(), RecyclePartitionPB::RECYCLING); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(tbl_version_key, &val), TxnErrorCode::TXN_OK); + val_int = *reinterpret_cast(val.data()); + ASSERT_EQ(val_int, 1); } TEST(MetaServiceTxnStoreRetryableTest, MockGetVersion) { diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 87c70833a30d22..de059d0df09be7 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -293,8 +293,8 @@ static int create_recycle_partiton(TxnKv* txn_kv, int64_t table_id, int64_t part return 0; } -static int create_version_kv(TxnKv* txn_kv, int64_t table_id, int64_t partition_id) { - auto key = version_key({instance_id, db_id, table_id, partition_id}); +static int create_partition_version_kv(TxnKv* txn_kv, int64_t table_id, int64_t partition_id) { + auto key = partition_version_key({instance_id, db_id, table_id, partition_id}); VersionPB version; version.set_version(1); auto val = version.SerializeAsString(); @@ -309,6 +309,21 @@ static int create_version_kv(TxnKv* txn_kv, int64_t table_id, int64_t partition_ return 0; } +static int create_table_version_kv(TxnKv* txn_kv, int64_t table_id) { + auto key = table_version_key({instance_id, db_id, table_id}); + std::string val(sizeof(int64_t), 0); + *reinterpret_cast(val.data()) = (int64_t) 1; + std::unique_ptr txn; + if (txn_kv->create_txn(&txn) != TxnErrorCode::TXN_OK) { + return -1; + } + txn->put(key, val); + if (txn->commit() != TxnErrorCode::TXN_OK) { + return -1; + } + return 0; +} + static int create_txn_label_kv(TxnKv* txn_kv, std::string label, int64_t db_id) { std::string txn_label_key_; std::string txn_label_val; @@ -1133,8 +1148,9 @@ TEST(RecyclerTest, recycle_versions) { } } for (auto partition_id : partition_ids) { - create_version_kv(txn_kv.get(), table_id, partition_id); + create_partition_version_kv(txn_kv.get(), table_id, partition_id); } + create_table_version_kv(txn_kv.get(), table_id); // Drop partitions for (int i = 0; i < 5; ++i) { create_recycle_partiton(txn_kv.get(), table_id, partition_ids[i], index_ids); @@ -1147,16 +1163,23 @@ TEST(RecyclerTest, recycle_versions) { // Recycle all partitions in table except 30006 ASSERT_EQ(recycler.recycle_partitions(), 0); ASSERT_EQ(recycler.recycle_versions(), 0); // `recycle_versions` should do nothing - // All version kvs except version of partition 30006 must have been deleted + // All partition version kvs except version of partition 30006 must have been deleted std::unique_ptr txn; ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - auto key_begin = version_key({instance_id, db_id, table_id, 0}); - auto key_end = version_key({instance_id, db_id, table_id, INT64_MAX}); + auto partition_key_begin = partition_version_key({instance_id, db_id, table_id, 0}); + auto partition_key_end = partition_version_key({instance_id, db_id, table_id, INT64_MAX}); std::unique_ptr iter; - ASSERT_EQ(txn->get(key_begin, key_end, &iter), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(partition_key_begin, partition_key_end, &iter), TxnErrorCode::TXN_OK); + ASSERT_EQ(iter->size(), 1); + auto [pk, pv] = iter->next(); + EXPECT_EQ(pk, partition_version_key({instance_id, db_id, table_id, 30006})); + // Table 10000's table version must not be deleted + auto table_key_begin = table_version_key({instance_id, db_id, 0}); + auto table_key_end = table_version_key({instance_id, db_id, INT64_MAX}); + ASSERT_EQ(txn->get(table_key_begin, table_key_end, &iter), TxnErrorCode::TXN_OK); ASSERT_EQ(iter->size(), 1); - auto [k, v] = iter->next(); - EXPECT_EQ(k, version_key({instance_id, db_id, table_id, 30006})); + auto [tk, tv] = iter->next(); + EXPECT_EQ(tk, table_version_key({instance_id, db_id, 10000})); // Drop indexes for (auto index_id : index_ids) { @@ -1167,7 +1190,9 @@ TEST(RecyclerTest, recycle_versions) { // `recycle_versions` should delete all version kvs of the dropped table ASSERT_EQ(recycler.recycle_versions(), 0); ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); - ASSERT_EQ(txn->get(key_begin, key_end, &iter), TxnErrorCode::TXN_OK); + ASSERT_EQ(txn->get(partition_key_begin, partition_key_end, &iter), TxnErrorCode::TXN_OK); + ASSERT_EQ(iter->size(), 0); + ASSERT_EQ(txn->get(table_key_begin, table_key_end, &iter), TxnErrorCode::TXN_OK); ASSERT_EQ(iter->size(), 0); } @@ -1790,9 +1815,13 @@ TEST(RecyclerTest, recycle_deleted_instance) { for (size_t i = 0; i < 100; i++) { ASSERT_EQ(0, create_txn_label_kv(txn_kv.get(), fmt::format("fake_label{}", i), i)); } - // create version key + // create partition version key + for (size_t i = 101; i < 200; i += 2) { + ASSERT_EQ(0, create_partition_version_kv(txn_kv.get(), i, i + 1)); + } + // create table version key for (size_t i = 101; i < 200; i += 2) { - ASSERT_EQ(0, create_version_kv(txn_kv.get(), i, i + 1)); + ASSERT_EQ(0, create_table_version_kv(txn_kv.get(), i)); } // create meta key std::vector schemas; @@ -1851,8 +1880,18 @@ TEST(RecyclerTest, recycle_deleted_instance) { ASSERT_EQ(txn->get(start_txn_key, end_txn_key, &it), TxnErrorCode::TXN_OK); ASSERT_EQ(it->size(), 0); - std::string start_version_key = version_key({instance_id, 0, 0, 0}); - std::string end_version_key = version_key({instance_id, INT64_MAX, 0, 0}); + std::string start_partition_version_key = partition_version_key({instance_id, 0, 0, 0}); + std::string end_partition_version_key = partition_version_key({instance_id, INT64_MAX, 0, 0}); + ASSERT_EQ(txn->get(start_partition_version_key, end_partition_version_key, &it), TxnErrorCode::TXN_OK); + ASSERT_EQ(it->size(), 0); + + std::string start_table_version_key = table_version_key({instance_id, 0, 0}); + std::string end_table_version_key = table_version_key({instance_id, INT64_MAX, 0}); + ASSERT_EQ(txn->get(start_table_version_key, end_table_version_key, &it), TxnErrorCode::TXN_OK); + ASSERT_EQ(it->size(), 0); + + std::string start_version_key = version_key_prefix(instance_id); + std::string end_version_key = version_key_prefix(instance_id + '\x00'); ASSERT_EQ(txn->get(start_version_key, end_version_key, &it), TxnErrorCode::TXN_OK); ASSERT_EQ(it->size(), 0); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java index 60980d4699f6bb..71ba5ef35cf169 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudRollupJobV2.java @@ -92,7 +92,7 @@ protected void onCreateRollupReplicaDone() throws AlterCancelException { rollupIndexList.add(rollupIndexId); try { ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .commitMaterializedIndex(tableId, rollupIndexList); + .commitMaterializedIndex(dbId, tableId, rollupIndexList, false); } catch (Exception e) { LOG.warn("commitMaterializedIndex Exception:{}", e); throw new AlterCancelException(e.getMessage()); @@ -110,7 +110,7 @@ protected void onCancel() { while (true) { try { ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .dropMaterializedIndex(tableId, rollupIndexList); + .dropMaterializedIndex(tableId, rollupIndexList, false); break; } catch (Exception e) { LOG.warn("tryTimes:{}, onCancel exception:", tryTimes, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java index 0739da03642471..8e3a198c2a4407 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/CloudSchemaChangeJobV2.java @@ -83,8 +83,7 @@ protected void commitShadowIndex() throws AlterCancelException { indexIdMap.keySet().stream().collect(Collectors.toList()); try { ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .commitMaterializedIndex(tableId, - shadowIdxList); + .commitMaterializedIndex(dbId, tableId, shadowIdxList, false); } catch (Exception e) { LOG.warn("commitMaterializedIndex exception:", e); throw new AlterCancelException(e.getMessage()); @@ -110,7 +109,7 @@ private void dropIndex(List idxList) { while (true) { try { ((CloudInternalCatalog) Env.getCurrentInternalCatalog()) - .dropMaterializedIndex(tableId, idxList); + .dropMaterializedIndex(tableId, idxList, false); break; } catch (Exception e) { LOG.warn("tryTimes:{}, dropIndex exception:", tryTimes, e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 1b479f364639b2..0c286cf6dd1ae6 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -5518,9 +5518,14 @@ public void replaceTempPartition(Database db, OlapTable olapTable, ReplacePartit } } olapTable.replaceTempPartitions(partitionNames, tempPartitionNames, isStrictRange, useTempPartitionName); - long version = olapTable.getNextVersion(); + long version; long versionTime = System.currentTimeMillis(); - olapTable.updateVisibleVersionAndTime(version, versionTime); + if (Config.isNotCloudMode()) { + version = olapTable.getNextVersion(); + olapTable.updateVisibleVersionAndTime(version, versionTime); + } else { + version = olapTable.getVisibleVersion(); + } // write log ReplacePartitionOperationLog info = new ReplacePartitionOperationLog(db.getId(), db.getFullName(), olapTable.getId(), olapTable.getName(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index b6239f486cdeec..83c017e1539c28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -37,6 +37,8 @@ import org.apache.doris.clone.TabletSchedCtx; import org.apache.doris.clone.TabletScheduler; import org.apache.doris.cloud.catalog.CloudPartition; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.qe.SnapshotProxy; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -46,6 +48,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.io.DeepCopy; import org.apache.doris.common.io.Text; +import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.mtmv.MTMVRelatedTableIf; @@ -53,7 +56,9 @@ import org.apache.doris.mtmv.MTMVVersionSnapshot; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.StmtExecutor; import org.apache.doris.resource.Tag; +import org.apache.doris.rpc.RpcException; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.AnalysisInfo.AnalysisType; import org.apache.doris.statistics.BaseAnalysisTask; @@ -2666,11 +2671,48 @@ public void updateVisibleVersionAndTime(long visibleVersion, long visibleVersion // During `getNextVersion` and `updateVisibleVersionAndTime` period, // the write lock on the table should be held continuously public long getNextVersion() { - return tableAttributes.getNextVersion(); + if (!Config.isCloudMode()) { + return tableAttributes.getNextVersion(); + } else { + // cloud mode should not reach here + if (LOG.isDebugEnabled()) { + LOG.debug("getNextVersion in Cloud mode in OlapTable {} ", getName()); + } + return getVisibleVersion() + 1; + } } public long getVisibleVersion() { - return tableAttributes.getVisibleVersion(); + if (Config.isNotCloudMode()) { + return tableAttributes.getVisibleVersion(); + } + // get version rpc + Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder() + .setDbId(this.getDatabase().getId()) + .setTableId(this.id) + .setBatchMode(false) + .setIsTableVersion(true) + .build(); + + try { + Cloud.GetVersionResponse resp = getVersionFromMeta(request); + long version = -1; + if (resp.getStatus().getCode() == Cloud.MetaServiceCode.OK) { + version = resp.getVersion(); + } else { + assert resp.getStatus().getCode() == Cloud.MetaServiceCode.VERSION_NOT_FOUND; + version = 0; + } + if (LOG.isDebugEnabled()) { + LOG.debug("get version from meta service, version: {}, table: {}", version, getId()); + } + if (version == 0) { + version = 1; + } + return version; + } catch (RpcException e) { + throw new RuntimeException("get version from meta service failed"); + } } public long getVisibleVersionTime() { @@ -2719,6 +2761,19 @@ public String getPartitionName(long partitionId) throws AnalysisException { } } + private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req) + throws RpcException { + long startAt = System.nanoTime(); + try { + return SnapshotProxy.getVisibleVersion(req); + } finally { + SummaryProfile profile = getSummaryProfile(); + if (profile != null) { + profile.addGetTableVersionTime(System.nanoTime() - startAt); + } + } + } + @Override public boolean needAutoRefresh() { return true; @@ -2728,4 +2783,15 @@ public boolean needAutoRefresh() { public boolean isPartitionColumnAllowNull() { return false; } + + private static SummaryProfile getSummaryProfile() { + ConnectContext ctx = ConnectContext.get(); + if (ctx != null) { + StmtExecutor executor = ctx.getExecutor(); + if (executor != null) { + return executor.getSummaryProfile(); + } + } + return null; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java index 01603823752762..60e567ce9fe001 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java @@ -22,8 +22,7 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.cloud.proto.Cloud.MetaServiceCode; -import org.apache.doris.cloud.rpc.MetaServiceProxy; -import org.apache.doris.common.Config; +import org.apache.doris.cloud.qe.SnapshotProxy; import org.apache.doris.common.profile.SummaryProfile; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; @@ -38,10 +37,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -328,7 +323,7 @@ private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionReque throws RpcException { long startAt = System.nanoTime(); try { - return getVersionFromMetaInner(req); + return SnapshotProxy.getVisibleVersion(req); } finally { SummaryProfile profile = getSummaryProfile(); if (profile != null) { @@ -337,55 +332,6 @@ private static Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionReque } } - private static Cloud.GetVersionResponse getVersionFromMetaInner(Cloud.GetVersionRequest req) - throws RpcException { - for (int retryTime = 0; retryTime < Config.cloud_meta_service_rpc_failed_retry_times; retryTime++) { - try { - long deadline = System.currentTimeMillis() + Config.default_get_version_from_ms_timeout_second * 1000L; - Future future = - MetaServiceProxy.getInstance().getVisibleVersionAsync(req); - - Cloud.GetVersionResponse resp = null; - while (resp == null) { - try { - resp = future.get(Math.max(0, deadline - System.currentTimeMillis()), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.warn("get version from meta service: future get interrupted exception"); - } - } - - if (resp.hasStatus() && (resp.getStatus().getCode() == MetaServiceCode.OK - || resp.getStatus().getCode() == MetaServiceCode.VERSION_NOT_FOUND)) { - if (LOG.isDebugEnabled()) { - LOG.debug("get version from meta service, code: {}", resp.getStatus().getCode()); - } - return resp; - } - - LOG.warn("get version from meta service failed, status: {}, retry time: {}", - resp.getStatus(), retryTime); - } catch (RpcException | ExecutionException | TimeoutException | RuntimeException e) { - LOG.warn("get version from meta service failed, retry times: {} exception: ", retryTime, e); - } - - // sleep random millis [20, 200] ms, retry rpc failed - int randomMillis = 20 + (int) (Math.random() * (200 - 20)); - if (retryTime > Config.cloud_meta_service_rpc_failed_retry_times / 2) { - // sleep random millis [500, 1000] ms, retry rpc failed - randomMillis = 500 + (int) (Math.random() * (1000 - 500)); - } - try { - Thread.sleep(randomMillis); - } catch (InterruptedException e) { - LOG.warn("get version from meta service: sleep get interrupted exception"); - } - } - - LOG.warn("get version from meta service failed after retry {} times", - Config.cloud_meta_service_rpc_failed_retry_times); - throw new RpcException("get version from meta service", "failed after retry n times"); - } - private static boolean isEmptyPartitionPruneDisabled() { ConnectContext ctx = ConnectContext.get(); if (ctx != null && ctx.getSessionVariable().getDisableEmptyPartitionPrune()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index f48c38de698b29..b473f80e126959 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -361,12 +361,13 @@ protected void beforeCreatePartitions(long dbId, long tableId, List partit } @Override - protected void afterCreatePartitions(long tableId, List partitionIds, List indexIds) + protected void afterCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, + boolean isCreateTable) throws DdlException { if (partitionIds == null) { - commitMaterializedIndex(tableId, indexIds); + commitMaterializedIndex(dbId, tableId, indexIds, isCreateTable); } else { - commitPartition(tableId, partitionIds, indexIds); + commitPartition(dbId, tableId, partitionIds, indexIds); } } @@ -406,11 +407,13 @@ private void preparePartition(long dbId, long tableId, List partitionIds, } } - private void commitPartition(long tableId, List partitionIds, List indexIds) throws DdlException { + private void commitPartition(long dbId, long tableId, List partitionIds, List indexIds) + throws DdlException { Cloud.PartitionRequest.Builder partitionRequestBuilder = Cloud.PartitionRequest.newBuilder(); partitionRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); partitionRequestBuilder.addAllPartitionIds(partitionIds); partitionRequestBuilder.addAllIndexIds(indexIds); + partitionRequestBuilder.setDbId(dbId); partitionRequestBuilder.setTableId(tableId); final Cloud.PartitionRequest partitionRequest = partitionRequestBuilder.build(); @@ -469,11 +472,14 @@ public void prepareMaterializedIndex(Long tableId, List indexIds, long exp } } - public void commitMaterializedIndex(Long tableId, List indexIds) throws DdlException { + public void commitMaterializedIndex(long dbId, long tableId, List indexIds, boolean isCreateTable) + throws DdlException { Cloud.IndexRequest.Builder indexRequestBuilder = Cloud.IndexRequest.newBuilder(); indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); indexRequestBuilder.addAllIndexIds(indexIds); + indexRequestBuilder.setDbId(dbId); indexRequestBuilder.setTableId(tableId); + indexRequestBuilder.setIsNewTable(isCreateTable); final Cloud.IndexRequest indexRequest = indexRequestBuilder.build(); Cloud.IndexResponse response = null; @@ -562,7 +568,7 @@ public void eraseTableDropBackendReplicas(OlapTable olapTable, boolean isReplay) if (indexs.isEmpty()) { break; } - dropMaterializedIndex(olapTable.getId(), indexs); + dropMaterializedIndex(olapTable.getId(), indexs, true); } catch (Exception e) { LOG.warn("failed to drop index {} of table {}, try cnt {}, execption {}", indexs, olapTable.getId(), tryCnt, e); @@ -657,7 +663,7 @@ private void dropCloudPartition(long dbId, long tableId, List partitionIds } } - public void dropMaterializedIndex(Long tableId, List indexIds) throws DdlException { + public void dropMaterializedIndex(long tableId, List indexIds, boolean dropTable) throws DdlException { Cloud.IndexRequest.Builder indexRequestBuilder = Cloud.IndexRequest.newBuilder(); indexRequestBuilder.setCloudUniqueId(Config.cloud_unique_id); indexRequestBuilder.addAllIndexIds(indexIds); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/SnapshotProxy.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/SnapshotProxy.java new file mode 100644 index 00000000000000..1f4b20ab1dd457 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/qe/SnapshotProxy.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.qe; + +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.MetaServiceProxy; +import org.apache.doris.common.Config; +import org.apache.doris.rpc.RpcException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SnapshotProxy { + private static final Logger LOG = LogManager.getLogger(SnapshotProxy.class); + + public static Cloud.GetVersionResponse getVisibleVersion(Cloud.GetVersionRequest request) throws RpcException { + int tryTimes = 0; + while (tryTimes++ < Config.meta_service_rpc_retry_times) { + Cloud.GetVersionResponse resp = getVisibleVersionInternal(request, + Config.default_get_version_from_ms_timeout_second * 1000); + if (resp.hasStatus() && (resp.getStatus().getCode() == Cloud.MetaServiceCode.OK + || resp.getStatus().getCode() == Cloud.MetaServiceCode.VERSION_NOT_FOUND)) { + if (LOG.isDebugEnabled()) { + LOG.debug("get version from meta service, code: {}", resp.getStatus().getCode()); + } + return resp; + } + + LOG.warn("get version from meta service failed, status: {}, retry time: {}", + resp.getStatus(), tryTimes); + + // sleep random millis, retry rpc failed + if (tryTimes > Config.meta_service_rpc_retry_times / 2) { + sleepSeveralMs(500, 1000); + } else { + sleepSeveralMs(20, 200); + } + } + + LOG.warn("get version from meta service failed after retry {} times", tryTimes); + throw new RpcException("get version from meta service", "failed after retry n times"); + } + + public static Cloud.GetVersionResponse getVisibleVersionInternal(Cloud.GetVersionRequest request, int timeoutMs) { + long deadline = System.currentTimeMillis() + timeoutMs; + Cloud.GetVersionResponse resp = null; + try { + Future future = + MetaServiceProxy.getInstance().getVisibleVersionAsync(request); + + while (resp == null) { + try { + resp = future.get(Math.max(0, deadline - System.currentTimeMillis()), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.warn("get version from meta service: future get interrupted exception"); + } + } + } catch (RpcException | ExecutionException | TimeoutException | RuntimeException e) { + LOG.warn("get version from meta service failed, exception: ", e); + } + return resp; + } + + private static void sleepSeveralMs(int lowerMs, int upperMs) { + // sleep random millis [lowerMs, upperMs] ms + try { + Thread.sleep(lowerMs + (int) (Math.random() * (upperMs - lowerMs))); + } catch (InterruptedException e) { + LOG.warn("get snapshot from meta service: sleep get interrupted exception"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java index d4615dab820afc..7e9367d0abd07c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java @@ -74,6 +74,8 @@ public class SummaryProfile { public static final String GET_PARTITION_VERSION_TIME = "Get Partition Version Time"; public static final String GET_PARTITION_VERSION_COUNT = "Get Partition Version Count"; public static final String GET_PARTITION_VERSION_BY_HAS_DATA_COUNT = "Get Partition Version Count (hasData)"; + public static final String GET_TABLE_VERSION_TIME = "Get Table Version Time"; + public static final String GET_TABLE_VERSION_COUNT = "Get Table Version Count"; public static final String PARSE_SQL_TIME = "Parse SQL Time"; public static final String NEREIDS_ANALYSIS_TIME = "Nereids Analysis Time"; @@ -93,7 +95,8 @@ public class SummaryProfile { PLAN_TIME, JOIN_REORDER_TIME, CREATE_SINGLE_NODE_TIME, QUERY_DISTRIBUTED_TIME, INIT_SCAN_NODE_TIME, FINALIZE_SCAN_NODE_TIME, GET_SPLITS_TIME, GET_PARTITIONS_TIME, GET_PARTITION_FILES_TIME, CREATE_SCAN_RANGE_TIME, GET_PARTITION_VERSION_TIME, - GET_PARTITION_VERSION_BY_HAS_DATA_COUNT, GET_PARTITION_VERSION_COUNT, SCHEDULE_TIME, FETCH_RESULT_TIME, + GET_PARTITION_VERSION_BY_HAS_DATA_COUNT, GET_PARTITION_VERSION_COUNT, GET_TABLE_VERSION_TIME, + GET_TABLE_VERSION_COUNT, SCHEDULE_TIME, FETCH_RESULT_TIME, WRITE_RESULT_TIME, WAIT_FETCH_RESULT_TIME, DORIS_VERSION, IS_NEREIDS, IS_PIPELINE, IS_CACHED, TOTAL_INSTANCES_NUM, INSTANCES_NUM_PER_BE, PARALLEL_FRAGMENT_EXEC_INSTANCE, TRACE_ID); @@ -115,6 +118,8 @@ public class SummaryProfile { builder.put(GET_PARTITION_VERSION_TIME, 1); builder.put(GET_PARTITION_VERSION_COUNT, 1); builder.put(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT, 1); + builder.put(GET_TABLE_VERSION_TIME, 1); + builder.put(GET_TABLE_VERSION_COUNT, 1); EXECUTION_SUMMARY_KEYS_IDENTATION = builder.build(); } @@ -158,6 +163,8 @@ public class SummaryProfile { private long getPartitionVersionTime = 0; private long getPartitionVersionCount = 0; private long getPartitionVersionByHasDataCount = 0; + private long getTableVersionTime = 0; + private long getTableVersionCount = 0; public SummaryProfile(RuntimeProfile rootProfile) { summaryProfile = new RuntimeProfile(SUMMARY_PROFILE_NAME); @@ -235,6 +242,8 @@ private void updateExecutionSummaryProfile() { executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_COUNT, getPrettyGetPartitionVersionCount()); executionSummaryProfile.addInfoString(GET_PARTITION_VERSION_BY_HAS_DATA_COUNT, getPrettyGetPartitionVersionByHasDataCount()); + executionSummaryProfile.addInfoString(GET_TABLE_VERSION_TIME, getPrettyGetTableVersionTime()); + executionSummaryProfile.addInfoString(GET_TABLE_VERSION_COUNT, getPrettyGetTableVersionCount()); } } @@ -347,6 +356,11 @@ public void addGetPartitionVersionTime(long ns) { this.getPartitionVersionCount += 1; } + public void addGetTableVersionTime(long ns) { + this.getTableVersionTime += ns; + this.getTableVersionCount += 1; + } + public void incGetPartitionVersionByHasDataCount() { this.getPartitionVersionByHasDataCount += 1; } @@ -593,4 +607,15 @@ private String getPrettyGetPartitionVersionByHasDataCount() { private String getPrettyGetPartitionVersionCount() { return RuntimeProfile.printCounter(getPartitionVersionCount, TUnit.UNIT); } + + private String getPrettyGetTableVersionTime() { + if (getTableVersionTime == 0) { + return "N/A"; + } + return RuntimeProfile.printCounter(getTableVersionTime, TUnit.TIME_NS); + } + + private String getPrettyGetTableVersionCount() { + return RuntimeProfile.printCounter(getTableVersionCount, TUnit.UNIT); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 91c919af3f88a3..31c882a05f0e91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1612,7 +1612,7 @@ public void addPartition(Database db, String tableName, AddPartitionClause addPa singlePartitionDesc.getTabletType(), storagePolicy, idGeneratorBuffer, binlogConfig, dataProperty.isStorageMediumSpecified(), null); - afterCreatePartitions(olapTable.getId(), partitionIds, indexIds); + afterCreatePartitions(db.getId(), olapTable.getId(), partitionIds, indexIds, false); // TODO cluster key ids // check again @@ -2033,7 +2033,8 @@ protected void beforeCreatePartitions(long dbId, long tableId, List partit throws DdlException { } - protected void afterCreatePartitions(long tableId, List partitionIds, List indexIds) + protected void afterCreatePartitions(long dbId, long tableId, List partitionIds, List indexIds, + boolean isCreateTable) throws DdlException { } @@ -2669,7 +2670,8 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep binlogConfigForTask, partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified(), keysDesc.getClusterKeysColumnIds()); - afterCreatePartitions(olapTable.getId(), null, olapTable.getIndexIdList()); + afterCreatePartitions(db.getId(), olapTable.getId(), null, + olapTable.getIndexIdList(), true); olapTable.addPartition(partition); } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { @@ -2767,7 +2769,8 @@ private void createOlapTable(Database db, CreateTableStmt stmt) throws UserExcep olapTable.getPartitionInfo().getDataProperty(partition.getId()) .setStoragePolicy(partionStoragePolicy); } - afterCreatePartitions(olapTable.getId(), null, olapTable.getIndexIdList()); + afterCreatePartitions(db.getId(), olapTable.getId(), null, + olapTable.getIndexIdList(), true); } else { throw new DdlException("Unsupported partition method: " + partitionInfo.getType().name()); } @@ -3227,7 +3230,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti newPartitions.add(newPartition); } - afterCreatePartitions(copiedTbl.getId(), newPartitionIds, indexIds); + afterCreatePartitions(db.getId(), copiedTbl.getId(), newPartitionIds, indexIds, true); } catch (DdlException e) { // create partition failed, remove all newly created tablets diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 2db7dd1a5ef557..8e674f64b1f5af 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -718,6 +718,9 @@ message GetVersionRequest { repeated int64 db_ids = 6; repeated int64 table_ids = 7; repeated int64 partition_ids = 8; + + // True if get table version + optional bool is_table_version = 9; }; message GetVersionResponse { @@ -853,6 +856,8 @@ message IndexRequest { repeated int64 index_ids = 2; optional int64 table_id = 3; optional int64 expiration = 4; + optional int64 db_id = 5; + optional bool is_new_table = 6; } message IndexResponse { diff --git a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy index 5c7162e1eef197..cdd49d76183137 100644 --- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy +++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy @@ -54,7 +54,6 @@ excludeDirectories = "000_the_start_sentinel_do_not_touch," + // keep this line "unique_with_mow_p0/cluster_key," + "unique_with_mow_p0/ssb_unique_sql_zstd_cluster," + "unique_with_mow_p0/ssb_unique_load_zstd_c," + - "nereids_rules_p0/mv," + "backup_restore," + // not a case for cloud mode, no need to run "cold_heat_separation," + "storage_medium_p0," +