Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ CONF_Bool(enable_loopback_address_for_ms, "false");
// Comma seprated list: recycler_storage_vault_white_list="aaa,bbb,ccc"
CONF_Strings(recycler_storage_vault_white_list, "");

// for get_delete_bitmap_update_lock
CONF_mBool(enable_batch_get_mow_tablet_stats_and_meta, "true");

// aws sdk log level
// Off = 0,
// Fatal = 1,
Expand Down
203 changes: 152 additions & 51 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2574,9 +2574,19 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
return;
}

if (!get_mow_tablet_stats_and_meta(code, msg, request, response, instance_id, lock_key)) {
return;
};
}

bool MetaServiceImpl::get_mow_tablet_stats_and_meta(MetaServiceCode& code, std::string& msg,
const GetDeleteBitmapUpdateLockRequest* request,
GetDeleteBitmapUpdateLockResponse* response,
std::string& instance_id,
std::string& lock_key) {
bool require_tablet_stats =
request->has_require_compaction_stats() ? request->require_compaction_stats() : false;
if (!require_tablet_stats) return;
if (!require_tablet_stats) return true;
// this request is from fe when it commits txn for MOW table, we send the compaction stats
// along with the GetDeleteBitmapUpdateLockResponse which will be sent to BE later to let
// BE eliminate unnecessary sync_rowsets() calls if possible
Expand All @@ -2587,79 +2597,168 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
// these steps can be done in different fdb txns

StopWatch read_stats_sw;
err = txn_kv_->create_txn(&txn);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to init txn";
return;
return false;
}

for (const auto& tablet_idx : request->tablet_indexes()) {
auto table_id = request->table_id();
std::stringstream ss;
if (!config::enable_batch_get_mow_tablet_stats_and_meta) {
for (const auto& tablet_idx : request->tablet_indexes()) {
// 1. get compaction cnts
TabletStatsPB tablet_stat;
std::string stats_key =
stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
std::string stats_val;
TxnErrorCode err = txn->get(stats_key, &stats_val);
TEST_SYNC_POINT_CALLBACK(
"get_delete_bitmap_update_lock.get_compaction_cnts_inject_error", &err);
if (err == TxnErrorCode::TXN_TOO_OLD) {
code = MetaServiceCode::OK;
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to init txn when get tablet stats";
msg = ss.str();
return false;
}
err = txn->get(stats_key, &stats_val);
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err,
tablet_idx.tablet_id());
return false;
}
if (!tablet_stat.ParseFromArray(stats_val.data(), stats_val.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("marformed tablet stats value, key={}", hex(stats_key));
return false;
}
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());

// 2. get tablet states
std::string tablet_meta_key =
meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
std::string tablet_meta_val;
err = txn->get(tablet_meta_key, &tablet_meta_val);
if (err != TxnErrorCode::TXN_OK) {
ss << "failed to get tablet meta"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< " instance_id=" << instance_id << " tablet_id=" << tablet_idx.tablet_id()
<< " key=" << hex(tablet_meta_key) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
return false;
}
doris::TabletMetaCloudPB tablet_meta;
if (!tablet_meta.ParseFromString(tablet_meta_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed tablet meta";
return false;
}
response->add_tablet_states(
static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state()));
}
} else {
// 1. get compaction cnts
TabletStatsPB tablet_stat;
std::string stats_key =
stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
std::string stats_val;
TxnErrorCode err = txn->get(stats_key, &stats_val);
std::vector<std::string> stats_tablet_keys;
for (const auto& tablet_idx : request->tablet_indexes()) {
stats_tablet_keys.push_back(
stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()}));
}
std::vector<std::optional<std::string>> stats_tablet_values;
err = txn->batch_get(&stats_tablet_values, stats_tablet_keys);
TEST_SYNC_POINT_CALLBACK("get_delete_bitmap_update_lock.get_compaction_cnts_inject_error",
&err);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet stats, err={} table_id={} lock_id={}", err,
table_id, request->lock_id());
return false;
}
for (size_t i = 0; i < stats_tablet_keys.size(); i++) {
if (!stats_tablet_values[i].has_value()) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err,
request->tablet_indexes(i).tablet_id());
return false;
}
TabletStatsPB tablet_stat;
if (!tablet_stat.ParseFromString(stats_tablet_values[i].value())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("marformed tablet stats value");
return false;
}
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());
}
stats_tablet_keys.clear();
stats_tablet_values.clear();
DCHECK(request->tablet_indexes_size() == response->base_compaction_cnts_size());
DCHECK(request->tablet_indexes_size() == response->cumulative_compaction_cnts_size());
DCHECK(request->tablet_indexes_size() == response->cumulative_points_size());

// 2. get tablet states
std::vector<std::string> tablet_meta_keys;
for (const auto& tablet_idx : request->tablet_indexes()) {
tablet_meta_keys.push_back(
meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()}));
}
std::vector<std::optional<std::string>> tablet_meta_values;
err = txn->batch_get(&tablet_meta_values, tablet_meta_keys);
if (err == TxnErrorCode::TXN_TOO_OLD) {
code = MetaServiceCode::OK;
err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
ss << "failed to init txn when get tablet stats";
ss << "failed to init txn when get tablet meta";
msg = ss.str();
return;
return false;
}
err = txn->get(stats_key, &stats_val);
err = txn->batch_get(&tablet_meta_values, tablet_meta_keys);
}
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet stats, err={} tablet_id={}", err,
tablet_idx.tablet_id());
return;
}
if (!tablet_stat.ParseFromArray(stats_val.data(), stats_val.size())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("marformed tablet stats value, key={}", hex(stats_key));
return;
}
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());

// 2. get tablet states
std::string tablet_meta_key =
meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
std::string tablet_meta_val;
err = txn->get(tablet_meta_key, &tablet_meta_val);
if (err != TxnErrorCode::TXN_OK) {
ss << "failed to get tablet meta"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< " instance_id=" << instance_id << " tablet_id=" << tablet_idx.tablet_id()
<< " key=" << hex(tablet_meta_key) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
return;
msg = fmt::format("failed to get tablet meta, err={} table_id={} lock_id={}", err,
table_id, request->lock_id());
return false;
}
doris::TabletMetaCloudPB tablet_meta;
if (!tablet_meta.ParseFromString(tablet_meta_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed tablet meta";
return;
for (size_t i = 0; i < tablet_meta_keys.size(); i++) {
if (!tablet_meta_values[i].has_value()) {
code = cast_as<ErrCategory::READ>(err);
msg = fmt::format("failed to get tablet meta, err={} tablet_id={}", err,
request->tablet_indexes(i).tablet_id());
return false;
}
doris::TabletMetaCloudPB tablet_meta;
if (!tablet_meta.ParseFromString(tablet_meta_values[i].value())) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = fmt::format("marformed tablet meta value");
return false;
}
response->add_tablet_states(
static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state()));
}
response->add_tablet_states(
static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state()));
DCHECK(request->tablet_indexes_size() == response->tablet_states_size());
}

read_stats_sw.pause();
LOG(INFO) << fmt::format(
"tablet_idxes.size()={}, read tablet compaction cnts and tablet states cost={} ms",
request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000);
"table_id={}, tablet_idxes.size()={}, read tablet compaction cnts and tablet states "
"cost={} ms",
table_id, request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000);

DeleteBitmapUpdateLockPB lock_info_tmp;
if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, request->lock_id(),
Expand All @@ -2669,7 +2768,9 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
<< table_id << " request lock_id=" << request->lock_id()
<< " request initiator=" << request->initiator() << " code=" << code
<< " msg=" << msg;
return false;
}
return true;
}

void MetaServiceImpl::remove_delete_bitmap_update_lock(
Expand Down
5 changes: 5 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,11 @@ class MetaServiceImpl : public cloud::MetaService {
const AlterInstanceRequest* request,
std::function<std::pair<MetaServiceCode, std::string>(InstanceInfoPB*)> action);

bool get_mow_tablet_stats_and_meta(MetaServiceCode& code, std::string& msg,
const GetDeleteBitmapUpdateLockRequest* request,
GetDeleteBitmapUpdateLockResponse* response,
std::string& instance_id, std::string& lock_key);

std::shared_ptr<TxnKv> txn_kv_;
std::shared_ptr<ResourceManager> resource_mgr_;
std::shared_ptr<RateLimiter> rate_limiter_;
Expand Down
Loading
Loading