diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 61a3d19b4feace..6b5d65d1539d12 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -731,6 +731,32 @@ void internal_create_tablet(const CreateTabletsRequest* request, MetaServiceCode std::string key; std::string val; meta_tablet_key(key_info, &key); + + err = txn->get(key, &val); + TEST_SYNC_POINT_CALLBACK("meta_service_test:get_meta_tablet_key_error", &err); + if (err == TxnErrorCode::TXN_OK) { + doris::TabletMetaCloudPB exists_tablet_meta; + if (!exists_tablet_meta.ParseFromString(val)) { + code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; + msg = fmt::format("malformed tablet meta, unable to initialize, key={}", hex(key)); + return; + } + if (exists_tablet_meta.tablet_id() == tablet_meta.tablet_id() && + exists_tablet_meta.schema_version() == tablet_meta.schema_version()) { + // idempotent + code = MetaServiceCode::OK; + msg = fmt::format("tablet already exists, tablet_id={} schema_version={} key={}", + tablet_id, tablet_meta.schema_version(), hex(key)); + LOG(WARNING) << msg; + return; + } + } + if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + code = cast_as(err); + msg = "failed to get tablet key, key=" + hex(key); + LOG(WARNING) << msg; + return; + } if (!tablet_meta.SerializeToString(&val)) { code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; msg = "failed to serialize tablet meta"; diff --git a/cloud/src/meta-service/meta_service.h b/cloud/src/meta-service/meta_service.h index 38cc77aebb00af..01f65edb667cfd 100644 --- a/cloud/src/meta-service/meta_service.h +++ b/cloud/src/meta-service/meta_service.h @@ -977,6 +977,15 @@ class MetaServiceProxy final : public MetaService { auto dist = std::uniform_int_distribution(-config::idempotent_request_replay_delay_range_ms, config::idempotent_request_replay_delay_range_ms); int64_t sleep_ms = config::idempotent_request_replay_delay_base_ms + dist(rng); + std::string debug_string = req.ShortDebugString(); + if constexpr (std::is_same_v) { + auto short_req = req; + if (short_req.tablet_idx_size() > 10) { + short_req.mutable_tablet_idx()->DeleteSubrange(10, req.tablet_idx_size() - 10); + } + debug_string = short_req.ShortDebugString(); + TEST_SYNC_POINT_CALLBACK("idempotent_injection_short_debug_string_for_get_stats", &short_req); + } LOG(INFO) << " request_name=" << req.GetDescriptor()->name() << " response_name=" << res.GetDescriptor()->name() << " queue_ts=" << duration_cast(s.time_since_epoch()).count() @@ -984,7 +993,7 @@ class MetaServiceProxy final : public MetaService { << " idempotent_request_replay_delay_base_ms=" << config::idempotent_request_replay_delay_base_ms << " idempotent_request_replay_delay_range_ms=" << config::idempotent_request_replay_delay_range_ms << " idempotent_request_replay_delay_ms=" << sleep_ms - << " request=" << req.ShortDebugString(); + << " request=" << debug_string; if (sleep_ms < 0 || exclusion.count(req.GetDescriptor()->name())) return; brpc::Controller ctrl; bthread_usleep(sleep_ms * 1000); diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index 67f10699f385bf..60391f7c9e6f98 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -3084,4 +3084,37 @@ TEST(MetaServiceHttpTest, VirtualClusterTest) { } // namespace doris::cloud } + +TEST(MetaServiceHttpTest, ShortGetTabletStatsDebugStringTest) { + config::enable_idempotent_request_injection = true; + auto sp = SyncPoint::get_instance(); + sp->enable_processing(); + DORIS_CLOUD_DEFER { + sp->disable_processing(); + }; + + HttpContext ctx(true); + auto& meta_service = ctx.meta_service_; + constexpr auto table_id = 10001, index_id = 11001, partition_id = 12001; + int64_t tablet_id = 10001; + GetTabletStatsRequest req; + GetTabletStatsResponse res; + + brpc::Controller cntl; + for (size_t i = 0; i < 50; i++) { + auto* idx = req.add_tablet_idx(); + idx->set_table_id(table_id); + idx->set_index_id(index_id); + idx->set_partition_id(partition_id); + idx->set_tablet_id(tablet_id + i); + } + + meta_service->get_tablet_stats(&cntl, &req, &res, nullptr); + + sp->set_call_back("idempotent_injection_short_debug_string_for_get_stats", [](auto&& args) { + GetTabletStatsRequest debug_req = *try_any_cast(args.back()); + ASSERT_EQ(10, debug_req.tablet_idx_size()); + }); +} + } // namespace doris::cloud diff --git a/cloud/test/meta_service_job_test.cpp b/cloud/test/meta_service_job_test.cpp index e53ed77fb35fd1..b71fee9aa3f231 100644 --- a/cloud/test/meta_service_job_test.cpp +++ b/cloud/test/meta_service_job_test.cpp @@ -1793,6 +1793,16 @@ TEST(MetaServiceJobTest, DeleteBitmapUpdateLockCompatibilityTest) { ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); }; + auto clear_tablets = [&](int64_t table_id, int64_t index_id, int64_t partition_id, + int64_t tablet_id) { + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string key = + meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); + txn->remove(key); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + }; + config::use_delete_bitmap_lock_random_version = false; int64_t table_id = 111; remove_delete_bitmap_lock(meta_service.get(), table_id); @@ -1859,8 +1869,10 @@ TEST(MetaServiceJobTest, DeleteBitmapUpdateLockCompatibilityTest) { config::delete_bitmap_lock_v2_white_list = version4 == 1 ? "" : "*"; test_commit_compaction_job(table_id, 2, 3, tablet_id, TabletCompactionJobPB::BASE); ASSERT_EQ(res.status().code(), MetaServiceCode::LOCK_EXPIRED); - clear_rowsets(table_id); + clear_rowsets(tablet_id); clear_rowsets(new_tablet_id); + clear_tablets(table_id, 2, 3, tablet_id); + clear_tablets(table_id, 2, 3, new_tablet_id); remove_delete_bitmap_lock(meta_service.get(), table_id); } @@ -4734,4 +4746,234 @@ TEST(MetaServiceJobTest, CancelSC) { } } +TEST(MetaServiceJobTest, IdempotentCompactionJob) { + auto meta_service = get_meta_service(); + // meta_service->resource_mgr().reset(); // Do not use resource manager + + auto sp = SyncPoint::get_instance(); + DORIS_CLOUD_DEFER { + SyncPoint::get_instance()->clear_all_call_backs(); + }; + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + brpc::Controller cntl; + + auto test_commit_compaction_job = [&](int64_t table_id, int64_t index_id, int64_t partition_id, + int64_t tablet_id, std::string job_id, + TabletCompactionJobPB::CompactionType type) { + FinishTabletJobRequest req; + FinishTabletJobResponse res; + + auto compaction = req.mutable_job()->add_compaction(); + compaction->set_id(job_id); + compaction->set_initiator("ip:port"); + req.mutable_job()->mutable_idx()->set_table_id(table_id); + req.mutable_job()->mutable_idx()->set_index_id(index_id); + req.mutable_job()->mutable_idx()->set_partition_id(partition_id); + req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); + compaction->set_base_compaction_cnt(10); + compaction->set_cumulative_compaction_cnt(20); + req.set_action(FinishTabletJobRequest::COMMIT); + + auto tablet_meta_key = + meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); + std::string tablet_meta_val; + std::unique_ptr txn; + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + doris::TabletMetaCloudPB tablet_meta_pb; + ASSERT_EQ(txn->get(tablet_meta_key, &tablet_meta_val), TxnErrorCode::TXN_OK); + ASSERT_TRUE(tablet_meta_pb.ParseFromString(tablet_meta_val)); + tablet_meta_pb.set_cumulative_layer_point(50); + txn->put(tablet_meta_key, tablet_meta_pb.SerializeAsString()); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + // Create create tablet stats, compation job will will update stats + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + auto tablet_stats_key = + stats_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); + std::string tablet_stats_val; + TabletStatsPB tablet_stats_pb; + ASSERT_EQ(txn->get(tablet_stats_key, &tablet_stats_val), TxnErrorCode::TXN_OK); + ASSERT_TRUE(tablet_stats_pb.ParseFromString(tablet_stats_val)); + + std::mt19937 rng(std::chrono::system_clock::now().time_since_epoch().count()); + std::uniform_int_distribution dist(1, 10000); // Positive numbers + + compaction->set_output_cumulative_point(tablet_stats_pb.cumulative_point() + dist(rng)); + compaction->set_num_output_rows(dist(rng)); + compaction->set_num_output_rowsets(dist(rng)); + compaction->set_num_output_segments(dist(rng)); + compaction->set_num_input_rows(dist(rng)); + compaction->set_num_input_rowsets(dist(rng)); + compaction->set_num_input_segments(dist(rng)); + compaction->set_size_input_rowsets(dist(rng)); + compaction->set_size_output_rowsets(dist(rng)); + compaction->set_index_size_input_rowsets(dist(rng)); + compaction->set_segment_size_output_rowsets(dist(rng)); + compaction->set_index_size_input_rowsets(dist(rng)); + compaction->set_segment_size_output_rowsets(dist(rng)); + compaction->set_type(type); + + tablet_stats_pb.set_cumulative_compaction_cnt(dist(rng)); + tablet_stats_pb.set_base_compaction_cnt(dist(rng)); + tablet_stats_pb.set_cumulative_point(tablet_meta_pb.cumulative_layer_point()); + // MUST let data stats be larger than input data size + tablet_stats_pb.set_num_rows(dist(rng) + compaction->num_input_rows()); + tablet_stats_pb.set_data_size(dist(rng) + compaction->size_input_rowsets()); + tablet_stats_pb.set_num_rowsets(dist(rng) + compaction->num_input_rowsets()); + tablet_stats_pb.set_num_segments(dist(rng) + compaction->num_input_segments()); + tablet_stats_pb.set_index_size(dist(rng) + compaction->index_size_input_rowsets()); + tablet_stats_pb.set_segment_size(dist(rng) + compaction->segment_size_input_rowsets()); + + txn->put(tablet_stats_key, tablet_stats_pb.SerializeAsString()); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + // Provide input and output rowset info + int64_t input_version_start = dist(rng); + int64_t input_version_end = input_version_start + 100; + compaction->add_input_versions(input_version_start); + compaction->add_input_versions(input_version_end); + compaction->add_output_versions(input_version_end); + compaction->add_output_rowset_ids("output rowset id"); + + // Input rowsets must exist, and more than 0 + // Check number input rowsets + sp->set_call_back("process_compaction_job::loop_input_done", [](auto&& args) { + auto* num_input_rowsets = try_any_cast(args[0]); + ASSERT_EQ(*num_input_rowsets, 0); // zero existed rowsets + }); + + // Provide input rowset KVs, boundary test, 5 input rowsets + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + // clang-format off + std::vector input_rowset_keys = { + meta_rowset_key({instance_id, tablet_id, input_version_start - 1}), + meta_rowset_key({instance_id, tablet_id, input_version_start}), + meta_rowset_key({instance_id, tablet_id, input_version_start + 1}), + meta_rowset_key({instance_id, tablet_id, (input_version_start + input_version_end) / 2}), + meta_rowset_key({instance_id, tablet_id, input_version_end - 1}), + meta_rowset_key({instance_id, tablet_id, input_version_end}), + meta_rowset_key({instance_id, tablet_id, input_version_end + 1}), + }; + // clang-format on + std::vector> input_rowset_vals; + for (auto& i : input_rowset_keys) { + doris::RowsetMetaCloudPB rs_pb; + rs_pb.set_rowset_id(0); + rs_pb.set_rowset_id_v2(hex(i)); + input_rowset_vals.emplace_back(new std::string(rs_pb.SerializeAsString())); + txn->put(i, *input_rowset_vals.back()); + } + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + // Check number input rowsets + sp->set_call_back("process_compaction_job::loop_input_done", [](auto&& args) { + auto* num_input_rowsets = try_any_cast(args[0]); + ASSERT_EQ(*num_input_rowsets, 5); + }); + + int64_t txn_id = dist(rng); + compaction->add_txn_id(txn_id); + + // Provide invalid output rowset meta + auto tmp_rowset_key = meta_rowset_tmp_key({instance_id, txn_id, tablet_id}); + doris::RowsetMetaCloudPB tmp_rs_pb; + tmp_rs_pb.set_rowset_id(0); + auto tmp_rowset_val = tmp_rs_pb.SerializeAsString(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->put(tmp_rowset_key, tmp_rowset_val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + // Provide txn_id in output rowset meta + tmp_rs_pb.set_txn_id(10086); + tmp_rowset_val = tmp_rs_pb.SerializeAsString(); + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + txn->put(tmp_rowset_key, tmp_rowset_val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + meta_service->finish_tablet_job(reinterpret_cast<::google::protobuf::RpcController*>(&cntl), + &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK); + tablet_stats_val.clear(); + ASSERT_EQ(txn->get(tablet_stats_key, &tablet_stats_val), TxnErrorCode::TXN_OK); + TabletStatsPB stats; + ASSERT_TRUE(stats.ParseFromString(tablet_stats_val)); + + // clang-format off + EXPECT_EQ(stats.base_compaction_cnt() , tablet_stats_pb.base_compaction_cnt() + (req.job().compaction(0).type() == TabletCompactionJobPB::BASE)); + EXPECT_EQ(stats.cumulative_compaction_cnt(), tablet_stats_pb.cumulative_compaction_cnt() + (req.job().compaction(0).type() == TabletCompactionJobPB::CUMULATIVE)); + EXPECT_EQ(stats.cumulative_point() , type == TabletCompactionJobPB::BASE ? 50 : req.job().compaction(0).output_cumulative_point()); + EXPECT_EQ(stats.num_rows() , tablet_stats_pb.num_rows() + (req.job().compaction(0).num_output_rows() - req.job().compaction(0).num_input_rows())); + EXPECT_EQ(stats.data_size() , tablet_stats_pb.data_size() + (req.job().compaction(0).size_output_rowsets() - req.job().compaction(0).size_input_rowsets())); + EXPECT_EQ(stats.num_rowsets() , tablet_stats_pb.num_rowsets() + (req.job().compaction(0).num_output_rowsets() - req.job().compaction(0).num_input_rowsets())); + EXPECT_EQ(stats.num_segments() , tablet_stats_pb.num_segments() + (req.job().compaction(0).num_output_segments() - req.job().compaction(0).num_input_segments())); + EXPECT_EQ(stats.index_size() , tablet_stats_pb.index_size() + (req.job().compaction(0).index_size_output_rowsets() - req.job().compaction(0).index_size_input_rowsets())); + EXPECT_EQ(stats.segment_size() , tablet_stats_pb.segment_size() + (req.job().compaction(0).segment_size_output_rowsets() - req.job().compaction(0).segment_size_input_rowsets())); + // clang-format on + + // Check job removed, tablet meta updated + auto job_key = job_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id}); + std::string job_val; + ASSERT_EQ(txn->get(job_key, &job_val), TxnErrorCode::TXN_OK); + TabletJobInfoPB job_pb; + ASSERT_TRUE(job_pb.ParseFromString(job_val)); + ASSERT_TRUE(job_pb.compaction().empty()); + tablet_meta_val.clear(); + + // Check tmp rowset removed + ASSERT_EQ(txn->get(tmp_rowset_key, &tmp_rowset_val), TxnErrorCode::TXN_KEY_NOT_FOUND); + // Check input rowsets removed, the largest version remains + for (int i = 1; i < input_rowset_keys.size() - 2; ++i) { + std::string val; + EXPECT_EQ(txn->get(input_rowset_keys[i], &val), TxnErrorCode::TXN_KEY_NOT_FOUND) + << hex(input_rowset_keys[i]); + } + // Check recycle rowsets added + for (int i = 1; i < input_rowset_vals.size() - 1; ++i) { + doris::RowsetMetaCloudPB rs; + ASSERT_TRUE(rs.ParseFromString(*input_rowset_vals[i])); + auto key = recycle_rowset_key({instance_id, tablet_id, rs.rowset_id_v2()}); + std::string val; + EXPECT_EQ(txn->get(key, &val), TxnErrorCode::TXN_OK) << hex(key); + } + // Check output rowset added + auto rowset_key = meta_rowset_key({instance_id, tablet_id, input_version_end}); + std::string rowset_val; + EXPECT_EQ(txn->get(rowset_key, &rowset_val), TxnErrorCode::TXN_OK) << hex(rowset_key); + }; + constexpr int64_t table_id = 1; + constexpr int64_t index_id = 2; + constexpr int64_t partition_id = 3; + constexpr int64_t tablet_id = 4; + auto type = TabletCompactionJobPB::BASE; + std::string job_id = "job_id1234"; + + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id, false); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, job_id, "ip:port", 9, 19, + TabletCompactionJobPB::BASE, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + } + + ASSERT_NO_FATAL_FAILURE( + test_commit_compaction_job(table_id, index_id, partition_id, tablet_id, job_id, type)); + + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id, false); + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, job_id, "ip:port", 9, 19, + TabletCompactionJobPB::BASE, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_TABLET_CACHE); + } +} + } // namespace doris::cloud diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index f4e57c208ee62f..9055c1f2a4da5f 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -11829,4 +11829,43 @@ TEST(MetaServiceTest, SetSnapshotPropertyTest) { } } +TEST(MetaServiceTest, CreateTabletIdempotentAndHandlingError) { + DORIS_CLOUD_DEFER { + SyncPoint::get_instance()->clear_all_call_backs(); + SyncPoint::get_instance()->disable_processing(); + }; + + size_t case_num = 0; + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("meta_service_test:get_meta_tablet_key_error", [&case_num](auto&& args) { + if (++case_num == 3) { + auto* code = try_any_cast(args[0]); + *code = TxnErrorCode::TXN_INVALID_DATA; + } + }); + sp->enable_processing(); + + auto meta_service = get_meta_service(); + brpc::Controller cntl; + CreateTabletsRequest req; + CreateTabletsResponse res; + int64_t table_id = 100; + int64_t index_id = 200; + int64_t partition_id = 300; + int64_t tablet_id = 400; + req.set_db_id(1); // default db_id + add_tablet(req, table_id, index_id, partition_id, tablet_id); + // normal create + meta_service->create_tablets(&cntl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + // idempotent + meta_service->create_tablets(&cntl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK); + + // error handling + meta_service->create_tablets(&cntl, &req, &res, nullptr); + ASSERT_EQ(res.status().code(), MetaServiceCode::KV_TXN_GET_ERR); +} + } // namespace doris::cloud