Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,32 @@ void internal_create_tablet(const CreateTabletsRequest* request, MetaServiceCode
std::string key;
std::string val;
meta_tablet_key(key_info, &key);

err = txn->get(key, &val);
TEST_SYNC_POINT_CALLBACK("meta_service_test:get_meta_tablet_key_error", &err);
if (err == TxnErrorCode::TXN_OK) {
doris::TabletMetaCloudPB exists_tablet_meta;
if (!exists_tablet_meta.ParseFromString(val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = fmt::format("malformed tablet meta, unable to initialize, key={}", hex(key));
return;
}
if (exists_tablet_meta.tablet_id() == tablet_meta.tablet_id() &&
exists_tablet_meta.schema_version() == tablet_meta.schema_version()) {
// idempotent
code = MetaServiceCode::OK;
msg = fmt::format("tablet already exists, tablet_id={} schema_version={} key={}",
tablet_id, tablet_meta.schema_version(), hex(key));
LOG(WARNING) << msg;
return;
}
}
if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
code = cast_as<ErrCategory::READ>(err);
msg = "failed to get tablet key, key=" + hex(key);
LOG(WARNING) << msg;
return;
}
if (!tablet_meta.SerializeToString(&val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
msg = "failed to serialize tablet meta";
Expand Down
11 changes: 10 additions & 1 deletion cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -977,14 +977,23 @@ class MetaServiceProxy final : public MetaService {
auto dist = std::uniform_int_distribution(-config::idempotent_request_replay_delay_range_ms,
config::idempotent_request_replay_delay_range_ms);
int64_t sleep_ms = config::idempotent_request_replay_delay_base_ms + dist(rng);
std::string debug_string = req.ShortDebugString();
if constexpr (std::is_same_v<Request, GetTabletStatsRequest>) {
auto short_req = req;
if (short_req.tablet_idx_size() > 10) {
short_req.mutable_tablet_idx()->DeleteSubrange(10, req.tablet_idx_size() - 10);
}
debug_string = short_req.ShortDebugString();
TEST_SYNC_POINT_CALLBACK("idempotent_injection_short_debug_string_for_get_stats", &short_req);
}
LOG(INFO) << " request_name=" << req.GetDescriptor()->name()
<< " response_name=" << res.GetDescriptor()->name()
<< " queue_ts=" << duration_cast<milliseconds>(s.time_since_epoch()).count()
<< " now_ts=" << duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count()
<< " idempotent_request_replay_delay_base_ms=" << config::idempotent_request_replay_delay_base_ms
<< " idempotent_request_replay_delay_range_ms=" << config::idempotent_request_replay_delay_range_ms
<< " idempotent_request_replay_delay_ms=" << sleep_ms
<< " request=" << req.ShortDebugString();
<< " request=" << debug_string;
if (sleep_ms < 0 || exclusion.count(req.GetDescriptor()->name())) return;
brpc::Controller ctrl;
bthread_usleep(sleep_ms * 1000);
Expand Down
33 changes: 33 additions & 0 deletions cloud/test/meta_service_http_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3084,4 +3084,37 @@ TEST(MetaServiceHttpTest, VirtualClusterTest) {

} // namespace doris::cloud
}

TEST(MetaServiceHttpTest, ShortGetTabletStatsDebugStringTest) {
config::enable_idempotent_request_injection = true;
auto sp = SyncPoint::get_instance();
sp->enable_processing();
DORIS_CLOUD_DEFER {
sp->disable_processing();
};

HttpContext ctx(true);
auto& meta_service = ctx.meta_service_;
constexpr auto table_id = 10001, index_id = 11001, partition_id = 12001;
int64_t tablet_id = 10001;
GetTabletStatsRequest req;
GetTabletStatsResponse res;

brpc::Controller cntl;
for (size_t i = 0; i < 50; i++) {
auto* idx = req.add_tablet_idx();
idx->set_table_id(table_id);
idx->set_index_id(index_id);
idx->set_partition_id(partition_id);
idx->set_tablet_id(tablet_id + i);
}

meta_service->get_tablet_stats(&cntl, &req, &res, nullptr);

sp->set_call_back("idempotent_injection_short_debug_string_for_get_stats", [](auto&& args) {
GetTabletStatsRequest debug_req = *try_any_cast<GetTabletStatsRequest*>(args.back());
ASSERT_EQ(10, debug_req.tablet_idx_size());
});
}

} // namespace doris::cloud
244 changes: 243 additions & 1 deletion cloud/test/meta_service_job_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,16 @@ TEST(MetaServiceJobTest, DeleteBitmapUpdateLockCompatibilityTest) {
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
};

auto clear_tablets = [&](int64_t table_id, int64_t index_id, int64_t partition_id,
int64_t tablet_id) {
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string key =
meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
txn->remove(key);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
};

config::use_delete_bitmap_lock_random_version = false;
int64_t table_id = 111;
remove_delete_bitmap_lock(meta_service.get(), table_id);
Expand Down Expand Up @@ -1859,8 +1869,10 @@ TEST(MetaServiceJobTest, DeleteBitmapUpdateLockCompatibilityTest) {
config::delete_bitmap_lock_v2_white_list = version4 == 1 ? "" : "*";
test_commit_compaction_job(table_id, 2, 3, tablet_id, TabletCompactionJobPB::BASE);
ASSERT_EQ(res.status().code(), MetaServiceCode::LOCK_EXPIRED);
clear_rowsets(table_id);
clear_rowsets(tablet_id);
clear_rowsets(new_tablet_id);
clear_tablets(table_id, 2, 3, tablet_id);
clear_tablets(table_id, 2, 3, new_tablet_id);
remove_delete_bitmap_lock(meta_service.get(), table_id);
}

Expand Down Expand Up @@ -4734,4 +4746,234 @@ TEST(MetaServiceJobTest, CancelSC) {
}
}

TEST(MetaServiceJobTest, IdempotentCompactionJob) {
auto meta_service = get_meta_service();
// meta_service->resource_mgr().reset(); // Do not use resource manager

auto sp = SyncPoint::get_instance();
DORIS_CLOUD_DEFER {
SyncPoint::get_instance()->clear_all_call_backs();
};
sp->set_call_back("get_instance_id", [&](auto&& args) {
auto* ret = try_any_cast_ret<std::string>(args);
ret->first = instance_id;
ret->second = true;
});
sp->enable_processing();

brpc::Controller cntl;

auto test_commit_compaction_job = [&](int64_t table_id, int64_t index_id, int64_t partition_id,
int64_t tablet_id, std::string job_id,
TabletCompactionJobPB::CompactionType type) {
FinishTabletJobRequest req;
FinishTabletJobResponse res;

auto compaction = req.mutable_job()->add_compaction();
compaction->set_id(job_id);
compaction->set_initiator("ip:port");
req.mutable_job()->mutable_idx()->set_table_id(table_id);
req.mutable_job()->mutable_idx()->set_index_id(index_id);
req.mutable_job()->mutable_idx()->set_partition_id(partition_id);
req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id);
compaction->set_base_compaction_cnt(10);
compaction->set_cumulative_compaction_cnt(20);
req.set_action(FinishTabletJobRequest::COMMIT);

auto tablet_meta_key =
meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
std::string tablet_meta_val;
std::unique_ptr<Transaction> txn;
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
doris::TabletMetaCloudPB tablet_meta_pb;
ASSERT_EQ(txn->get(tablet_meta_key, &tablet_meta_val), TxnErrorCode::TXN_OK);
ASSERT_TRUE(tablet_meta_pb.ParseFromString(tablet_meta_val));
tablet_meta_pb.set_cumulative_layer_point(50);
txn->put(tablet_meta_key, tablet_meta_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);

// Create create tablet stats, compation job will will update stats
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
auto tablet_stats_key =
stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
std::string tablet_stats_val;
TabletStatsPB tablet_stats_pb;
ASSERT_EQ(txn->get(tablet_stats_key, &tablet_stats_val), TxnErrorCode::TXN_OK);
ASSERT_TRUE(tablet_stats_pb.ParseFromString(tablet_stats_val));

std::mt19937 rng(std::chrono::system_clock::now().time_since_epoch().count());
std::uniform_int_distribution<int> dist(1, 10000); // Positive numbers

compaction->set_output_cumulative_point(tablet_stats_pb.cumulative_point() + dist(rng));
compaction->set_num_output_rows(dist(rng));
compaction->set_num_output_rowsets(dist(rng));
compaction->set_num_output_segments(dist(rng));
compaction->set_num_input_rows(dist(rng));
compaction->set_num_input_rowsets(dist(rng));
compaction->set_num_input_segments(dist(rng));
compaction->set_size_input_rowsets(dist(rng));
compaction->set_size_output_rowsets(dist(rng));
compaction->set_index_size_input_rowsets(dist(rng));
compaction->set_segment_size_output_rowsets(dist(rng));
compaction->set_index_size_input_rowsets(dist(rng));
compaction->set_segment_size_output_rowsets(dist(rng));
compaction->set_type(type);

tablet_stats_pb.set_cumulative_compaction_cnt(dist(rng));
tablet_stats_pb.set_base_compaction_cnt(dist(rng));
tablet_stats_pb.set_cumulative_point(tablet_meta_pb.cumulative_layer_point());
// MUST let data stats be larger than input data size
tablet_stats_pb.set_num_rows(dist(rng) + compaction->num_input_rows());
tablet_stats_pb.set_data_size(dist(rng) + compaction->size_input_rowsets());
tablet_stats_pb.set_num_rowsets(dist(rng) + compaction->num_input_rowsets());
tablet_stats_pb.set_num_segments(dist(rng) + compaction->num_input_segments());
tablet_stats_pb.set_index_size(dist(rng) + compaction->index_size_input_rowsets());
tablet_stats_pb.set_segment_size(dist(rng) + compaction->segment_size_input_rowsets());

txn->put(tablet_stats_key, tablet_stats_pb.SerializeAsString());
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);

// Provide input and output rowset info
int64_t input_version_start = dist(rng);
int64_t input_version_end = input_version_start + 100;
compaction->add_input_versions(input_version_start);
compaction->add_input_versions(input_version_end);
compaction->add_output_versions(input_version_end);
compaction->add_output_rowset_ids("output rowset id");

// Input rowsets must exist, and more than 0
// Check number input rowsets
sp->set_call_back("process_compaction_job::loop_input_done", [](auto&& args) {
auto* num_input_rowsets = try_any_cast<int*>(args[0]);
ASSERT_EQ(*num_input_rowsets, 0); // zero existed rowsets
});

// Provide input rowset KVs, boundary test, 5 input rowsets
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
// clang-format off
std::vector<std::string> input_rowset_keys = {
meta_rowset_key({instance_id, tablet_id, input_version_start - 1}),
meta_rowset_key({instance_id, tablet_id, input_version_start}),
meta_rowset_key({instance_id, tablet_id, input_version_start + 1}),
meta_rowset_key({instance_id, tablet_id, (input_version_start + input_version_end) / 2}),
meta_rowset_key({instance_id, tablet_id, input_version_end - 1}),
meta_rowset_key({instance_id, tablet_id, input_version_end}),
meta_rowset_key({instance_id, tablet_id, input_version_end + 1}),
};
// clang-format on
std::vector<std::unique_ptr<std::string>> input_rowset_vals;
for (auto& i : input_rowset_keys) {
doris::RowsetMetaCloudPB rs_pb;
rs_pb.set_rowset_id(0);
rs_pb.set_rowset_id_v2(hex(i));
input_rowset_vals.emplace_back(new std::string(rs_pb.SerializeAsString()));
txn->put(i, *input_rowset_vals.back());
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);

// Check number input rowsets
sp->set_call_back("process_compaction_job::loop_input_done", [](auto&& args) {
auto* num_input_rowsets = try_any_cast<int*>(args[0]);
ASSERT_EQ(*num_input_rowsets, 5);
});

int64_t txn_id = dist(rng);
compaction->add_txn_id(txn_id);

// Provide invalid output rowset meta
auto tmp_rowset_key = meta_rowset_tmp_key({instance_id, txn_id, tablet_id});
doris::RowsetMetaCloudPB tmp_rs_pb;
tmp_rs_pb.set_rowset_id(0);
auto tmp_rowset_val = tmp_rs_pb.SerializeAsString();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(tmp_rowset_key, tmp_rowset_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);

// Provide txn_id in output rowset meta
tmp_rs_pb.set_txn_id(10086);
tmp_rowset_val = tmp_rs_pb.SerializeAsString();
ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
txn->put(tmp_rowset_key, tmp_rowset_val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);

meta_service->finish_tablet_job(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req, &res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);

ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
tablet_stats_val.clear();
ASSERT_EQ(txn->get(tablet_stats_key, &tablet_stats_val), TxnErrorCode::TXN_OK);
TabletStatsPB stats;
ASSERT_TRUE(stats.ParseFromString(tablet_stats_val));

// clang-format off
EXPECT_EQ(stats.base_compaction_cnt() , tablet_stats_pb.base_compaction_cnt() + (req.job().compaction(0).type() == TabletCompactionJobPB::BASE));
EXPECT_EQ(stats.cumulative_compaction_cnt(), tablet_stats_pb.cumulative_compaction_cnt() + (req.job().compaction(0).type() == TabletCompactionJobPB::CUMULATIVE));
EXPECT_EQ(stats.cumulative_point() , type == TabletCompactionJobPB::BASE ? 50 : req.job().compaction(0).output_cumulative_point());
EXPECT_EQ(stats.num_rows() , tablet_stats_pb.num_rows() + (req.job().compaction(0).num_output_rows() - req.job().compaction(0).num_input_rows()));
EXPECT_EQ(stats.data_size() , tablet_stats_pb.data_size() + (req.job().compaction(0).size_output_rowsets() - req.job().compaction(0).size_input_rowsets()));
EXPECT_EQ(stats.num_rowsets() , tablet_stats_pb.num_rowsets() + (req.job().compaction(0).num_output_rowsets() - req.job().compaction(0).num_input_rowsets()));
EXPECT_EQ(stats.num_segments() , tablet_stats_pb.num_segments() + (req.job().compaction(0).num_output_segments() - req.job().compaction(0).num_input_segments()));
EXPECT_EQ(stats.index_size() , tablet_stats_pb.index_size() + (req.job().compaction(0).index_size_output_rowsets() - req.job().compaction(0).index_size_input_rowsets()));
EXPECT_EQ(stats.segment_size() , tablet_stats_pb.segment_size() + (req.job().compaction(0).segment_size_output_rowsets() - req.job().compaction(0).segment_size_input_rowsets()));
// clang-format on

// Check job removed, tablet meta updated
auto job_key = job_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
std::string job_val;
ASSERT_EQ(txn->get(job_key, &job_val), TxnErrorCode::TXN_OK);
TabletJobInfoPB job_pb;
ASSERT_TRUE(job_pb.ParseFromString(job_val));
ASSERT_TRUE(job_pb.compaction().empty());
tablet_meta_val.clear();

// Check tmp rowset removed
ASSERT_EQ(txn->get(tmp_rowset_key, &tmp_rowset_val), TxnErrorCode::TXN_KEY_NOT_FOUND);
// Check input rowsets removed, the largest version remains
for (int i = 1; i < input_rowset_keys.size() - 2; ++i) {
std::string val;
EXPECT_EQ(txn->get(input_rowset_keys[i], &val), TxnErrorCode::TXN_KEY_NOT_FOUND)
<< hex(input_rowset_keys[i]);
}
// Check recycle rowsets added
for (int i = 1; i < input_rowset_vals.size() - 1; ++i) {
doris::RowsetMetaCloudPB rs;
ASSERT_TRUE(rs.ParseFromString(*input_rowset_vals[i]));
auto key = recycle_rowset_key({instance_id, tablet_id, rs.rowset_id_v2()});
std::string val;
EXPECT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK) << hex(key);
}
// Check output rowset added
auto rowset_key = meta_rowset_key({instance_id, tablet_id, input_version_end});
std::string rowset_val;
EXPECT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK) << hex(rowset_key);
};
constexpr int64_t table_id = 1;
constexpr int64_t index_id = 2;
constexpr int64_t partition_id = 3;
constexpr int64_t tablet_id = 4;
auto type = TabletCompactionJobPB::BASE;
std::string job_id = "job_id1234";

create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id, false);

{
StartTabletJobResponse res;
start_compaction_job(meta_service.get(), tablet_id, job_id, "ip:port", 9, 19,
TabletCompactionJobPB::BASE, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
}

ASSERT_NO_FATAL_FAILURE(
test_commit_compaction_job(table_id, index_id, partition_id, tablet_id, job_id, type));

create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id, false);
{
StartTabletJobResponse res;
start_compaction_job(meta_service.get(), tablet_id, job_id, "ip:port", 9, 19,
TabletCompactionJobPB::BASE, res);
ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_TABLET_CACHE);
}
}

} // namespace doris::cloud
Loading
Loading