Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ CONF_Bool(force_immediate_recycle, "false");
CONF_mBool(enable_mow_job_key_check, "false");
CONF_mBool(enable_restore_job_check, "false");

CONF_mBool(enable_tablet_stats_key_check, "false");

CONF_mBool(enable_checker_for_meta_key_check, "false");
CONF_mInt64(mow_job_key_check_expiration_diff_seconds, "600"); // 10min

Expand Down
283 changes: 283 additions & 0 deletions cloud/src/recycler/checker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
#include "common/logging.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/meta_service.h"
#include "meta-service/meta_service_schema.h"
#include "meta-service/meta_service_tablet_stats.h"
#include "meta-store/blob_message.h"
#include "meta-store/keys.h"
#include "meta-store/txn_kv.h"
Expand Down Expand Up @@ -198,6 +200,12 @@ int Checker::start() {
}
}

if (config::enable_tablet_stats_key_check) {
if (int ret = checker->do_tablet_stats_key_check(); ret != 0) {
success = false;
}
}

if (config::enable_restore_job_check) {
if (int ret = checker->do_restore_job_check(); ret != 0) {
success = false;
Expand Down Expand Up @@ -1764,6 +1772,281 @@ int InstanceChecker::do_mow_job_key_check() {
return 0;
}

int InstanceChecker::do_tablet_stats_key_check() {
int ret = 0;

int64_t nums_leak = 0;
int64_t nums_loss = 0;
int64_t nums_scanned = 0;
int64_t nums_abnormal = 0;

std::string begin = meta_tablet_key({instance_id_, 0, 0, 0, 0});
std::string end = meta_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
// inverted check tablet exists
LOG(INFO) << "begin inverted check stats_tablet_key";
ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
int ret = check_stats_tablet_key_exists(key, value);
nums_scanned++;
if (ret == 1) {
nums_loss++;
}
return ret;
});
if (ret == -1) {
LOG(WARNING) << "failed to inverted check if stats tablet key exists";
return -1;
} else if (ret == 1) {
LOG(WARNING) << "stats_tablet_key loss, nums_scanned=" << nums_scanned
<< ", nums_loss=" << nums_loss;
return 1;
}
LOG(INFO) << "finish inverted check stats_tablet_key, nums_scanned=" << nums_scanned
<< ", nums_loss=" << nums_loss;

begin = stats_tablet_key({instance_id_, 0, 0, 0, 0});
end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
nums_scanned = 0;
// check tablet exists
LOG(INFO) << "begin check stats_tablet_key leaked";
ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
int ret = check_stats_tablet_key_leaked(key, value);
nums_scanned++;
if (ret == 1) {
nums_leak++;
}
return ret;
});
if (ret == -1) {
LOG(WARNING) << "failed to check if stats tablet key exists";
return -1;
} else if (ret == 1) {
LOG(WARNING) << "stats_tablet_key leaked, nums_scanned=" << nums_scanned
<< ", nums_leak=" << nums_leak;
return 1;
}
LOG(INFO) << "finish check stats_tablet_key leaked, nums_scanned=" << nums_scanned
<< ", nums_leak=" << nums_leak;

begin = stats_tablet_key({instance_id_, 0, 0, 0, 0});
end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
nums_scanned = 0;
// check if key is normal
LOG(INFO) << "begin check stats_tablet_key abnormal";
ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
int ret = check_stats_tablet_key(key, value);
nums_scanned++;
if (ret == 1) {
nums_abnormal++;
}
return ret;
});
if (ret == -1) {
LOG(WARNING) << "failed to check if stats tablet key exists";
return -1;
} else if (ret == 1) {
LOG(WARNING) << "stats_tablet_key abnormal, nums_scanned=" << nums_scanned
<< ", nums_abnormal=" << nums_abnormal;
return 1;
}
LOG(INFO) << "finish check stats_tablet_key, nums_scanned=" << nums_scanned
<< ", nums_abnormal=" << nums_abnormal;
return 0;
}

int InstanceChecker::check_stats_tablet_key_exists(std::string_view key, std::string_view value) {
std::string_view k1 = key;
k1.remove_prefix(1);
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
decode_key(&k1, &out);
// 0x01 "meta" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
auto table_id = std::get<int64_t>(std::get<0>(out[3]));
auto index_id = std::get<int64_t>(std::get<0>(out[4]));
auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
std::string tablet_stats_key =
stats_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id});
int ret = key_exist(txn_kv_.get(), tablet_stats_key);
if (ret == 1) {
// clang-format off
LOG(WARNING) << "stats tablet key's tablet key loss,"
<< " stats tablet key=" << hex(tablet_stats_key)
<< " meta tablet key=" << hex(key);
// clang-format on
return 1;
} else if (ret == -1) {
LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_stats_key);
return -1;
}
LOG(INFO) << "check stats_tablet_key_exists ok, key=" << hex(key);
return 0;
}

int InstanceChecker::check_stats_tablet_key_leaked(std::string_view key, std::string_view value) {
std::string_view k1 = key;
k1.remove_prefix(1);
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
decode_key(&k1, &out);
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
auto table_id = std::get<int64_t>(std::get<0>(out[3]));
auto index_id = std::get<int64_t>(std::get<0>(out[4]));
auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
std::string tablet_key =
meta_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id});
int ret = key_exist(txn_kv_.get(), tablet_key);
if (ret == 1) {
// clang-format off
LOG(WARNING) << "stats tablet key's tablet key leak,"
<< " stats tablet key=" << hex(key)
<< " meta tablet key=" << hex(tablet_key);
// clang-format on
return 1;
} else if (ret == -1) {
LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_key);
return -1;
}
LOG(INFO) << "check stats_tablet_key_leaked ok, key=" << hex(key);
return 0;
}

int InstanceChecker::check_stats_tablet_key(std::string_view key, std::string_view value) {
TabletStatsPB tablet_stats_pb;
std::string_view k1 = key;
k1.remove_prefix(1);
std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
decode_key(&k1, &out);
// 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG_WARNING("failed to recycle tablet ")
.tag("tablet id", tablet_id)
.tag("instance_id", instance_id_)
.tag("reason", "failed to create txn");
return -1;
}
std::string tablet_idx_key = meta_tablet_idx_key({instance_id_, tablet_id});
std::string tablet_idx_val;
TabletIndexPB tablet_idx;
err = txn->get(tablet_idx_key, &tablet_idx_val);
if (err != TxnErrorCode::TXN_OK) {
// clang-format off
LOG(WARNING) << "failed to get tablet index key,"
<< " key=" << hex(tablet_idx_key)
<< " code=" << err;
// clang-format on
return -1;
}
tablet_idx.ParseFromString(tablet_idx_val);
MetaServiceCode code = MetaServiceCode::OK;
std::string msg;
internal_get_tablet_stats(code, msg, txn.get(), instance_id_, tablet_idx, tablet_stats_pb);
if (code != MetaServiceCode::OK) {
// clang-format off
LOG(WARNING) << "failed to get tablet stats,"
<< " code=" << code
<< " msg=" << msg;
// clang-format on
return -1;
}

GetRowsetResponse resp;
// get rowsets in tablet
internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, instance_id_,
tablet_id, code, msg, &resp);
if (code != MetaServiceCode::OK) {
LOG_WARNING("failed to get rowsets of tablet when check stats tablet key")
.tag("tablet id", tablet_id)
.tag("msg", msg)
.tag("code", code)
.tag("instance id", instance_id_);
return -1;
}
int64_t num_rows = 0;
int64_t num_rowsets = 0;
int64_t num_segments = 0;
int64_t total_data_size = 0;
for (const auto& rs_meta : resp.rowset_meta()) {
num_rows += rs_meta.num_rows();
num_rowsets++;
num_segments += rs_meta.num_segments();
total_data_size += rs_meta.total_disk_size();
}
int ret = 0;
if (tablet_stats_pb.data_size() != total_data_size) {
ret = 1;
// clang-format off
LOG(WARNING) << " tablet_stats_pb's data size is not same with all rowset total data size,"
<< " tablet_stats_pb's data size=" << tablet_stats_pb.data_size()
<< " all rowset total data size=" << total_data_size
<< " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
// clang-format on
} else if (tablet_stats_pb.num_rows() != num_rows) {
ret = 1;
// clang-format off
LOG(WARNING) << " tablet_stats_pb's num_rows is not same with all rowset total num_rows,"
<< " tablet_stats_pb's num_rows=" << tablet_stats_pb.num_rows()
<< " all rowset total num_rows=" << num_rows
<< " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
// clang-format on
} else if (tablet_stats_pb.num_rowsets() != num_rowsets) {
ret = 1;
// clang-format off
LOG(WARNING) << " tablet_stats_pb's num_rowsets is not same with all rowset nums,"
<< " tablet_stats_pb's num_rowsets=" << tablet_stats_pb.num_rowsets()
<< " all rowset nums=" << num_rowsets
<< " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
// clang-format on
} else if (tablet_stats_pb.num_segments() != num_segments) {
ret = 1;
// clang-format off
LOG(WARNING) << " tablet_stats_pb's num_segments is not same with all rowset total num_segments,"
<< " tablet_stats_pb's num_segments=" << tablet_stats_pb.num_segments()
<< " all rowset total num_segments=" << num_segments
<< " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
// clang-format on
}

return ret;
}

int InstanceChecker::scan_and_handle_kv(
std::string& start_key, const std::string& end_key,
std::function<int(std::string_view, std::string_view)> handle_kv) {
std::unique_ptr<Transaction> txn;
int ret = 0;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to init txn";
return -1;
}
std::unique_ptr<RangeGetIterator> it;
do {
err = txn->get(start_key, end_key, &it);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to get tablet idx, ret=" << err;
return -1;
}

while (it->has_next() && !stopped()) {
auto [k, v] = it->next();

int handle_ret = handle_kv(k, v);
if (handle_ret == -1) {
return -1;
} else {
ret = std::max(ret, handle_ret);
}
if (!it->has_next()) {
start_key = k;
}
}
start_key = it->next_begin_key();
} while (it->more() && !stopped());
return ret;
}

int InstanceChecker::do_restore_job_check() {
int64_t num_prepared = 0;
int64_t num_committed = 0;
Expand Down
29 changes: 29 additions & 0 deletions cloud/src/recycler/checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class InstanceChecker {

int do_mow_job_key_check();

int do_tablet_stats_key_check();

int do_restore_job_check();

// If there are multiple buckets, return the minimum lifecycle; if there are no buckets (i.e.
Expand Down Expand Up @@ -155,6 +157,33 @@ class InstanceChecker {
const std::string& rowset_info,
RowsetIndexesFormatV2& rowset_index_cache_v2);

// Return 0 if success.
// Return 1 if key loss is abnormal.
// Return negative if a temporary error occurred during the check process.
int check_stats_tablet_key(std::string_view key, std::string_view value);

// Return 0 if success.
// Return 1 if key loss is identified.
// Return negative if a temporary error occurred during the check process.
int check_stats_tablet_key_exists(std::string_view key, std::string_view value);

// Return 0 if success.
// Return 1 if key leak is identified.
// Return negative if a temporary error occurred during the check process.
int check_stats_tablet_key_leaked(std::string_view key, std::string_view value);

/**
* It is used to scan the key in the range from start_key to end_key
* and then perform handle operations on each group of kv
*
* @param start_key Range begining. Note that this function will modify the `start_key`
* @param end_key Range ending
* @param handle_kv Operations on kv
* @return code int 0 for success to scan and hanle, 1 for success to scan but handle abnormally, -1 for failed to handle
*/
int scan_and_handle_kv(std::string& start_key, const std::string& end_key,
std::function<int(std::string_view, std::string_view)> handle_kv);

std::atomic_bool stopped_ {false};
std::shared_ptr<TxnKv> txn_kv_;
std::string instance_id_;
Expand Down
Loading