Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 71 additions & 27 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,7 @@ static bool check_delete_bitmap_lock(MetaServiceCode& code, std::string& msg, st
std::string lock_val;
DeleteBitmapUpdateLockPB lock_info;
auto err = txn->get(lock_key, &lock_val);
TEST_SYNC_POINT_CALLBACK("check_delete_bitmap_lock.inject_get_lock_key_err", &err);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
msg = "lock id key not found";
code = MetaServiceCode::LOCK_EXPIRED;
Expand All @@ -1701,6 +1702,7 @@ static bool check_delete_bitmap_lock(MetaServiceCode& code, std::string& msg, st
msg = "failed to parse DeleteBitmapUpdateLockPB";
return false;
}
TEST_SYNC_POINT_CALLBACK("check_delete_bitmap_lock.set_lock_info", &lock_info);
if (lock_info.lock_id() != lock_id) {
ss << "lock id not match, locked by lock_id=" << lock_info.lock_id();
msg = ss.str();
Expand Down Expand Up @@ -1877,7 +1879,7 @@ void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
request->lock_id(), request->initiator())) {
LOG(WARNING) << "failed to check delete bitmap lock, table_id=" << table_id
<< " request lock_id=" << request->lock_id()
<< " request initiator=" << request->initiator() << " msg" << msg;
<< " request initiator=" << request->initiator() << " msg " << msg;
return;
}
}
Expand Down Expand Up @@ -2150,32 +2152,6 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
}
}

bool require_tablet_stats =
request->has_require_compaction_stats() ? request->require_compaction_stats() : false;
if (require_tablet_stats) {
// this request is from fe when it commits txn for MOW table, we send the compaction stats
// along with the GetDeleteBitmapUpdateLockResponse which will be sent to BE later to let
// BE eliminate unnecessary sync_rowsets() calls if possible
for (const auto& tablet_index : request->tablet_indexes()) {
TabletIndexPB idx(tablet_index);
TabletStatsPB tablet_stat;
internal_get_tablet_stats(code, msg, txn.get(), instance_id, idx, tablet_stat, false);
if (code != MetaServiceCode::OK) {
response->clear_base_compaction_cnts();
response->clear_cumulative_compaction_cnts();
response->clear_cumulative_points();
LOG_WARNING(
"failed to get tablet stats when get_delete_bitmap_update_lock, "
"lock_id={}, initiator={}, tablet_id={}",
request->lock_id(), request->initiator(), tablet_index.tablet_id());
return;
}
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());
}
}

lock_info.set_lock_id(request->lock_id());
lock_info.set_expiration(now + request->expiration());
bool found = false;
Expand Down Expand Up @@ -2205,6 +2181,74 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
msg = ss.str();
return;
}

bool require_tablet_stats =
request->has_require_compaction_stats() ? request->require_compaction_stats() : false;
if (!require_tablet_stats) return;
// this request is from fe when it commits txn for MOW table, we send the compaction stats
// along with the GetDeleteBitmapUpdateLockResponse which will be sent to BE later to let
// BE eliminate unnecessary sync_rowsets() calls if possible

// 1. hold the delete bitmap update lock in MS(update lock_info.lock_id to current load's txn id)
// 2. read tablets' stats
// 3. check whether we still hold the delete bitmap update lock
// these steps can be done in different fdb txns

StopWatch read_stats_sw;
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to init txn";
return;
}

for (const auto& tablet_idx : request->tablet_indexes()) {
TabletStatsPB tablet_stat;
std::string stats_key =
stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
std::string stats_val;
TxnErrorCode err = txn->get(stats_key, &stats_val);
TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock.get_compaction_cnts_inject_error",
&err);
if (err == TxnErrorCode::TXN_TOO_OLD) {
code = MetaServiceCode::OK;
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to init txn when get tablet stats";
msg = ss.str();
return;
}
err = txn->get(stats_key, &stats_val);
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err,
tablet_idx.tablet_id());
return;
}
if (!tablet_stat.ParseFromArray(stats_val.data(), stats_val.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("marformed tablet stats value, key={}", hex(stats_key));
return;
}
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());
}

read_stats_sw.pause();
LOG(INFO) << fmt::format("tablet_idxes.size()={}, read tablet compaction cnts cost={} ms",
request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000);

if (!check_delete_bitmap_lock(code, msg, ss, txn, instance_id, table_id, request->lock_id(),
request->initiator())) {
LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats, table_id="
<< table_id << " request lock_id=" << request->lock_id()
<< " request initiator=" << request->initiator() << " code=" << code
<< " msg=" << msg;
}
}

void MetaServiceImpl::remove_delete_bitmap_update_lock(
Expand Down
Loading
Loading