diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index b1ed35bc0ba002..9c3e88271f7eb2 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -120,6 +120,8 @@ CONF_Bool(force_immediate_recycle, "false"); CONF_mBool(enable_mow_job_key_check, "false"); CONF_mBool(enable_restore_job_check, "false"); +CONF_mBool(enable_tablet_stats_key_check, "false"); + CONF_mBool(enable_checker_for_meta_key_check, "false"); CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min diff --git a/cloud/src/recycler/checker.cpp b/cloud/src/recycler/checker.cpp index 544d6dfb52f8d7..b3a88d2005a7bc 100644 --- a/cloud/src/recycler/checker.cpp +++ b/cloud/src/recycler/checker.cpp @@ -44,7 +44,9 @@ #include "common/logging.h" #include "common/util.h" #include "cpp/sync_point.h" +#include "meta-service/meta_service.h" #include "meta-service/meta_service_schema.h" +#include "meta-service/meta_service_tablet_stats.h" #include "meta-store/blob_message.h" #include "meta-store/keys.h" #include "meta-store/txn_kv.h" @@ -198,6 +200,12 @@ int Checker::start() { } } + if (config::enable_tablet_stats_key_check) { + if (int ret = checker->do_tablet_stats_key_check(); ret != 0) { + success = false; + } + } + if (config::enable_restore_job_check) { if (int ret = checker->do_restore_job_check(); ret != 0) { success = false; @@ -1764,6 +1772,281 @@ int InstanceChecker::do_mow_job_key_check() { return 0; } +int InstanceChecker::do_tablet_stats_key_check() { + int ret = 0; + + int64_t nums_leak = 0; + int64_t nums_loss = 0; + int64_t nums_scanned = 0; + int64_t nums_abnormal = 0; + + std::string begin = meta_tablet_key({instance_id_, 0, 0, 0, 0}); + std::string end = meta_tablet_key({instance_id_, INT64_MAX, 0, 0, 0}); + // inverted check tablet exists + LOG(INFO) << "begin inverted check stats_tablet_key"; + ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) { + int ret = check_stats_tablet_key_exists(key, value); + nums_scanned++; + if (ret == 1) { + nums_loss++; + } + return ret; + }); + if (ret == -1) { + LOG(WARNING) << "failed to inverted check if stats tablet key exists"; + return -1; + } else if (ret == 1) { + LOG(WARNING) << "stats_tablet_key loss, nums_scanned=" << nums_scanned + << ", nums_loss=" << nums_loss; + return 1; + } + LOG(INFO) << "finish inverted check stats_tablet_key, nums_scanned=" << nums_scanned + << ", nums_loss=" << nums_loss; + + begin = stats_tablet_key({instance_id_, 0, 0, 0, 0}); + end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0}); + nums_scanned = 0; + // check tablet exists + LOG(INFO) << "begin check stats_tablet_key leaked"; + ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) { + int ret = check_stats_tablet_key_leaked(key, value); + nums_scanned++; + if (ret == 1) { + nums_leak++; + } + return ret; + }); + if (ret == -1) { + LOG(WARNING) << "failed to check if stats tablet key exists"; + return -1; + } else if (ret == 1) { + LOG(WARNING) << "stats_tablet_key leaked, nums_scanned=" << nums_scanned + << ", nums_leak=" << nums_leak; + return 1; + } + LOG(INFO) << "finish check stats_tablet_key leaked, nums_scanned=" << nums_scanned + << ", nums_leak=" << nums_leak; + + begin = stats_tablet_key({instance_id_, 0, 0, 0, 0}); + end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0}); + nums_scanned = 0; + // check if key is normal + LOG(INFO) << "begin check stats_tablet_key abnormal"; + ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) { + int ret = check_stats_tablet_key(key, value); + nums_scanned++; + if (ret == 1) { + nums_abnormal++; + } + return ret; + }); + if (ret == -1) { + LOG(WARNING) << "failed to check if stats tablet key exists"; + return -1; + } else if (ret == 1) { + LOG(WARNING) << "stats_tablet_key abnormal, nums_scanned=" << nums_scanned + << ", nums_abnormal=" << nums_abnormal; + return 1; + } + LOG(INFO) << "finish check stats_tablet_key, nums_scanned=" << nums_scanned + << ", nums_abnormal=" << nums_abnormal; + return 0; +} + +int InstanceChecker::check_stats_tablet_key_exists(std::string_view key, std::string_view value) { + std::string_view k1 = key; + k1.remove_prefix(1); + std::vector, int, int>> out; + decode_key(&k1, &out); + // 0x01 "meta" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} + auto table_id = std::get(std::get<0>(out[3])); + auto index_id = std::get(std::get<0>(out[4])); + auto partition_id = std::get(std::get<0>(out[5])); + auto tablet_id = std::get(std::get<0>(out[6])); + std::string tablet_stats_key = + stats_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id}); + int ret = key_exist(txn_kv_.get(), tablet_stats_key); + if (ret == 1) { + // clang-format off + LOG(WARNING) << "stats tablet key's tablet key loss," + << " stats tablet key=" << hex(tablet_stats_key) + << " meta tablet key=" << hex(key); + // clang-format on + return 1; + } else if (ret == -1) { + LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_stats_key); + return -1; + } + LOG(INFO) << "check stats_tablet_key_exists ok, key=" << hex(key); + return 0; +} + +int InstanceChecker::check_stats_tablet_key_leaked(std::string_view key, std::string_view value) { + std::string_view k1 = key; + k1.remove_prefix(1); + std::vector, int, int>> out; + decode_key(&k1, &out); + // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} + auto table_id = std::get(std::get<0>(out[3])); + auto index_id = std::get(std::get<0>(out[4])); + auto partition_id = std::get(std::get<0>(out[5])); + auto tablet_id = std::get(std::get<0>(out[6])); + std::string tablet_key = + meta_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id}); + int ret = key_exist(txn_kv_.get(), tablet_key); + if (ret == 1) { + // clang-format off + LOG(WARNING) << "stats tablet key's tablet key leak," + << " stats tablet key=" << hex(key) + << " meta tablet key=" << hex(tablet_key); + // clang-format on + return 1; + } else if (ret == -1) { + LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_key); + return -1; + } + LOG(INFO) << "check stats_tablet_key_leaked ok, key=" << hex(key); + return 0; +} + +int InstanceChecker::check_stats_tablet_key(std::string_view key, std::string_view value) { + TabletStatsPB tablet_stats_pb; + std::string_view k1 = key; + k1.remove_prefix(1); + std::vector, int, int>> out; + decode_key(&k1, &out); + // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id} + auto tablet_id = std::get(std::get<0>(out[6])); + std::unique_ptr txn; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + LOG_WARNING("failed to recycle tablet ") + .tag("tablet id", tablet_id) + .tag("instance_id", instance_id_) + .tag("reason", "failed to create txn"); + return -1; + } + std::string tablet_idx_key = meta_tablet_idx_key({instance_id_, tablet_id}); + std::string tablet_idx_val; + TabletIndexPB tablet_idx; + err = txn->get(tablet_idx_key, &tablet_idx_val); + if (err != TxnErrorCode::TXN_OK) { + // clang-format off + LOG(WARNING) << "failed to get tablet index key," + << " key=" << hex(tablet_idx_key) + << " code=" << err; + // clang-format on + return -1; + } + tablet_idx.ParseFromString(tablet_idx_val); + MetaServiceCode code = MetaServiceCode::OK; + std::string msg; + internal_get_tablet_stats(code, msg, txn.get(), instance_id_, tablet_idx, tablet_stats_pb); + if (code != MetaServiceCode::OK) { + // clang-format off + LOG(WARNING) << "failed to get tablet stats," + << " code=" << code + << " msg=" << msg; + // clang-format on + return -1; + } + + GetRowsetResponse resp; + // get rowsets in tablet + internal_get_rowset(txn.get(), 0, std::numeric_limits::max() - 1, instance_id_, + tablet_id, code, msg, &resp); + if (code != MetaServiceCode::OK) { + LOG_WARNING("failed to get rowsets of tablet when check stats tablet key") + .tag("tablet id", tablet_id) + .tag("msg", msg) + .tag("code", code) + .tag("instance id", instance_id_); + return -1; + } + int64_t num_rows = 0; + int64_t num_rowsets = 0; + int64_t num_segments = 0; + int64_t total_data_size = 0; + for (const auto& rs_meta : resp.rowset_meta()) { + num_rows += rs_meta.num_rows(); + num_rowsets++; + num_segments += rs_meta.num_segments(); + total_data_size += rs_meta.total_disk_size(); + } + int ret = 0; + if (tablet_stats_pb.data_size() != total_data_size) { + ret = 1; + // clang-format off + LOG(WARNING) << " tablet_stats_pb's data size is not same with all rowset total data size," + << " tablet_stats_pb's data size=" << tablet_stats_pb.data_size() + << " all rowset total data size=" << total_data_size + << " stats tablet meta=" << tablet_stats_pb.ShortDebugString(); + // clang-format on + } else if (tablet_stats_pb.num_rows() != num_rows) { + ret = 1; + // clang-format off + LOG(WARNING) << " tablet_stats_pb's num_rows is not same with all rowset total num_rows," + << " tablet_stats_pb's num_rows=" << tablet_stats_pb.num_rows() + << " all rowset total num_rows=" << num_rows + << " stats tablet meta=" << tablet_stats_pb.ShortDebugString(); + // clang-format on + } else if (tablet_stats_pb.num_rowsets() != num_rowsets) { + ret = 1; + // clang-format off + LOG(WARNING) << " tablet_stats_pb's num_rowsets is not same with all rowset nums," + << " tablet_stats_pb's num_rowsets=" << tablet_stats_pb.num_rowsets() + << " all rowset nums=" << num_rowsets + << " stats tablet meta=" << tablet_stats_pb.ShortDebugString(); + // clang-format on + } else if (tablet_stats_pb.num_segments() != num_segments) { + ret = 1; + // clang-format off + LOG(WARNING) << " tablet_stats_pb's num_segments is not same with all rowset total num_segments," + << " tablet_stats_pb's num_segments=" << tablet_stats_pb.num_segments() + << " all rowset total num_segments=" << num_segments + << " stats tablet meta=" << tablet_stats_pb.ShortDebugString(); + // clang-format on + } + + return ret; +} + +int InstanceChecker::scan_and_handle_kv( + std::string& start_key, const std::string& end_key, + std::function handle_kv) { + std::unique_ptr txn; + int ret = 0; + TxnErrorCode err = txn_kv_->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to init txn"; + return -1; + } + std::unique_ptr it; + do { + err = txn->get(start_key, end_key, &it); + if (err != TxnErrorCode::TXN_OK) { + LOG(WARNING) << "failed to get tablet idx, ret=" << err; + return -1; + } + + while (it->has_next() && !stopped()) { + auto [k, v] = it->next(); + + int handle_ret = handle_kv(k, v); + if (handle_ret == -1) { + return -1; + } else { + ret = std::max(ret, handle_ret); + } + if (!it->has_next()) { + start_key = k; + } + } + start_key = it->next_begin_key(); + } while (it->more() && !stopped()); + return ret; +} + int InstanceChecker::do_restore_job_check() { int64_t num_prepared = 0; int64_t num_committed = 0; diff --git a/cloud/src/recycler/checker.h b/cloud/src/recycler/checker.h index 436e1302b07cc0..39c990d88310ef 100644 --- a/cloud/src/recycler/checker.h +++ b/cloud/src/recycler/checker.h @@ -107,6 +107,8 @@ class InstanceChecker { int do_mow_job_key_check(); + int do_tablet_stats_key_check(); + int do_restore_job_check(); // If there are multiple buckets, return the minimum lifecycle; if there are no buckets (i.e. @@ -155,6 +157,33 @@ class InstanceChecker { const std::string& rowset_info, RowsetIndexesFormatV2& rowset_index_cache_v2); + // Return 0 if success. + // Return 1 if key loss is abnormal. + // Return negative if a temporary error occurred during the check process. + int check_stats_tablet_key(std::string_view key, std::string_view value); + + // Return 0 if success. + // Return 1 if key loss is identified. + // Return negative if a temporary error occurred during the check process. + int check_stats_tablet_key_exists(std::string_view key, std::string_view value); + + // Return 0 if success. + // Return 1 if key leak is identified. + // Return negative if a temporary error occurred during the check process. + int check_stats_tablet_key_leaked(std::string_view key, std::string_view value); + + /** + * It is used to scan the key in the range from start_key to end_key + * and then perform handle operations on each group of kv + * + * @param start_key Range begining. Note that this function will modify the `start_key` + * @param end_key Range ending + * @param handle_kv Operations on kv + * @return code int 0 for success to scan and hanle, 1 for success to scan but handle abnormally, -1 for failed to handle + */ + int scan_and_handle_kv(std::string& start_key, const std::string& end_key, + std::function handle_kv); + std::atomic_bool stopped_ {false}; std::shared_ptr txn_kv_; std::string instance_id_; diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 99cea3d72fd48e..987a6a15572ec1 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -4373,6 +4373,191 @@ TEST(CheckerTest, delete_bitmap_storage_optimize_v2_check_abnormal) { ASSERT_EQ(expected_abnormal_rowsets, real_abnormal_rowsets); } +TEST(CheckerTest, tablet_stats_key_check_leak) { + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("1"); + + InstanceChecker checker(txn_kv, instance_id); + ASSERT_EQ(checker.init(instance), 0); + + int64_t table_id = 1001; + int64_t index_id = 2001; + int64_t partition_id = 3001; + int64_t tablet_id = 4001; + + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + + // normal tablet stats + create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id); + + table_id = 1002; + index_id = 2002; + partition_id = 3002; + tablet_id = 4002; + + TabletStatsPB stats; + stats.mutable_idx()->set_tablet_id(tablet_id); + stats.mutable_idx()->set_table_id(table_id); + stats.mutable_idx()->set_index_id(index_id); + stats.mutable_idx()->set_partition_id(partition_id); + stats.set_data_size(1000); + stats.set_num_rows(100); + + std::string key = stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); + std::string val = stats.SerializeAsString(); + txn->put(key, val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + ASSERT_EQ(checker.do_tablet_stats_key_check(), 1); +} + +TEST(CheckerTest, tablet_stats_key_check_loss) { + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("1"); + + InstanceChecker checker(txn_kv, instance_id); + ASSERT_EQ(checker.init(instance), 0); + + const int64_t table_id = 1001; + const int64_t index_id = 2001; + const int64_t partition_id = 3001; + const int64_t tablet_id = 4001; + + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + + doris::TabletMetaCloudPB tablet_meta; + tablet_meta.set_tablet_id(tablet_id); + auto val = tablet_meta.SerializeAsString(); + auto key = meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); + txn->put(key, val); + + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + ASSERT_EQ(checker.do_tablet_stats_key_check(), 1); +} + +TEST(CheckerTest, tablet_stats_key_check_inconsistent_data) { + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("1"); + + InstanceChecker checker(txn_kv, instance_id); + ASSERT_EQ(checker.init(instance), 0); + + const int64_t table_id = 1001; + const int64_t index_id = 2001; + const int64_t partition_id = 3001; + const int64_t tablet_id = 4001; + + create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id); + + auto accessor = checker.accessor_map_.begin()->second; + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + create_committed_rowset(txn_kv.get(), accessor.get(), "resource_id", tablet_id, 1, index_id, 2); + + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + + TabletStatsPB stats; + stats.mutable_idx()->set_tablet_id(tablet_id); + stats.mutable_idx()->set_table_id(table_id); + stats.mutable_idx()->set_index_id(index_id); + stats.mutable_idx()->set_partition_id(partition_id); + stats.set_data_size(1000); + stats.set_num_rows(50); + stats.set_num_rowsets(5); + stats.set_num_segments(10); + + std::string key = stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); + std::string val = stats.SerializeAsString(); + txn->put(key, val); + + std::string tablet_idx_key = meta_tablet_idx_key({instance_id, tablet_id}); + std::string tablet_idx_val; + TabletIndexPB tablet_idx_pb; + tablet_idx_pb.set_tablet_id(tablet_id); + tablet_idx_pb.set_table_id(table_id); + tablet_idx_pb.set_index_id(index_id); + tablet_idx_pb.set_partition_id(partition_id); + tablet_idx_pb.SerializeToString(&tablet_idx_val); + txn->put(tablet_idx_key, tablet_idx_val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + ASSERT_EQ(checker.do_tablet_stats_key_check(), 1); +} + +TEST(CheckerTest, tablet_stats_key_check_normal) { + auto txn_kv = std::make_shared(); + ASSERT_EQ(txn_kv->init(), 0); + + InstanceInfoPB instance; + instance.set_instance_id(instance_id); + auto obj_info = instance.add_obj_info(); + obj_info->set_id("1"); + + InstanceChecker checker(txn_kv, instance_id); + ASSERT_EQ(checker.init(instance), 0); + + const int64_t table_id = 1001; + const int64_t index_id = 2001; + const int64_t partition_id = 3001; + const int64_t tablet_id = 4001; + + create_tablet(txn_kv.get(), table_id, index_id, partition_id, tablet_id); + + auto accessor = checker.accessor_map_.begin()->second; + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(1); + create_committed_rowset(txn_kv.get(), accessor.get(), "resource_id", tablet_id, 1, index_id, 2); + + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + + TabletStatsPB stats; + stats.mutable_idx()->set_tablet_id(tablet_id); + stats.mutable_idx()->set_table_id(table_id); + stats.mutable_idx()->set_index_id(index_id); + stats.mutable_idx()->set_partition_id(partition_id); + stats.set_data_size(0); + stats.set_num_rows(0); + stats.set_num_rowsets(1); + stats.set_num_segments(2); + + std::string key = stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); + std::string val = stats.SerializeAsString(); + txn->put(key, val); + + std::string tablet_idx_key = meta_tablet_idx_key({instance_id, tablet_id}); + std::string tablet_idx_val; + TabletIndexPB tablet_idx_pb; + tablet_idx_pb.set_tablet_id(tablet_id); + tablet_idx_pb.set_table_id(table_id); + tablet_idx_pb.set_index_id(index_id); + tablet_idx_pb.set_partition_id(partition_id); + tablet_idx_pb.SerializeToString(&tablet_idx_val); + txn->put(tablet_idx_key, tablet_idx_val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + ASSERT_EQ(checker.do_tablet_stats_key_check(), 0); +} + TEST(RecyclerTest, delete_rowset_data) { auto txn_kv = std::make_shared(); ASSERT_EQ(txn_kv->init(), 0);