diff --git a/be/src/cloud/cloud_delete_task.cpp b/be/src/cloud/cloud_delete_task.cpp index 35c48841d38c53..8ace61b6139f69 100644 --- a/be/src/cloud/cloud_delete_task.cpp +++ b/be/src/cloud/cloud_delete_task.cpp @@ -91,7 +91,7 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ RETURN_IF_ERROR(rowset_writer->build(rowset)); rowset->rowset_meta()->set_delete_predicate(std::move(del_pred)); - auto st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta()); + auto st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), ""); // Update tablet stats tablet->fetch_add_approximate_num_rowsets(1); diff --git a/be/src/cloud/cloud_delta_writer.cpp b/be/src/cloud/cloud_delta_writer.cpp index 7beaeb3e086703..2533028314f4d9 100644 --- a/be/src/cloud/cloud_delta_writer.cpp +++ b/be/src/cloud/cloud_delta_writer.cpp @@ -113,7 +113,7 @@ Status CloudDeltaWriter::commit_rowset() { RETURN_IF_ERROR(_rowset_builder->init()); RETURN_IF_ERROR(_rowset_builder->build_rowset()); } - return _engine.meta_mgr().commit_rowset(*rowset_meta()); + return _engine.meta_mgr().commit_rowset(*rowset_meta(), ""); } Status CloudDeltaWriter::set_txn_related_delete_bitmap() { diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 2e9492f2e1958b..c22a64e587c588 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -867,7 +867,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_ return Status::OK(); } -Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, +Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id, RowsetMetaSharedPtr* existed_rs_meta) { VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id() << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); @@ -879,6 +879,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, CreateRowsetResponse resp; req.set_cloud_unique_id(config::cloud_unique_id); req.set_txn_id(rs_meta.txn_id()); + req.set_tablet_job_id(job_id); RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true); doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(doris_rs_meta)); @@ -896,7 +897,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, return st; } -Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, +Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id, RowsetMetaSharedPtr* existed_rs_meta) { VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id() << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); @@ -909,6 +910,7 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, CreateRowsetResponse resp; req.set_cloud_unique_id(config::cloud_unique_id); req.set_txn_id(rs_meta.txn_id()); + req.set_tablet_job_id(job_id); RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb(); doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb)); diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h index cf86f09929dfce..0cc58e48166963 100644 --- a/be/src/cloud/cloud_meta_mgr.h +++ b/be/src/cloud/cloud_meta_mgr.h @@ -70,10 +70,10 @@ class CloudMetaMgr { bool warmup_delta_data = false, bool sync_delete_bitmap = true, bool full_sync = false, SyncRowsetStats* sync_stats = nullptr); - Status prepare_rowset(const RowsetMeta& rs_meta, + Status prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id, std::shared_ptr* existed_rs_meta = nullptr); - Status commit_rowset(const RowsetMeta& rs_meta, + Status commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id, std::shared_ptr* existed_rs_meta = nullptr); Status update_tmp_rowset(const RowsetMeta& rs_meta); diff --git a/be/src/cloud/cloud_rowset_builder.cpp b/be/src/cloud/cloud_rowset_builder.cpp index 9466dd1062803e..ea584fcd896bab 100644 --- a/be/src/cloud/cloud_rowset_builder.cpp +++ b/be/src/cloud/cloud_rowset_builder.cpp @@ -80,7 +80,7 @@ Status CloudRowsetBuilder::init() { _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token(); - RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta())); + RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(), "")); _is_init = true; return Status::OK(); diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index ce88be5264983e..05d29383af4213 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -290,7 +290,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam RowsetMetaSharedPtr existed_rs_meta; auto st = _cloud_storage_engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(), - &existed_rs_meta); + _job_id, &existed_rs_meta); if (!st.ok()) { if (st.is()) { LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet " @@ -325,7 +325,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam st.to_string()); } - st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), + st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), _job_id, &existed_rs_meta); if (!st.ok()) { if (st.is()) { diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 9b173db26fb1e1..1f21795e1c9bd5 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -1352,7 +1352,7 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) { << _output_rowset->rowset_meta()->rowset_id().to_string(); }) - RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get())); + RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get(), _uuid)); // 4. modify rowsets in memory RETURN_IF_ERROR(modify_rowsets()); @@ -1433,7 +1433,8 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& compaction_type() == ReaderType::READER_BASE_COMPACTION); ctx.file_cache_ttl_sec = _tablet->ttl_seconds(); _output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical)); - RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get())); + RETURN_IF_ERROR( + _engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get(), _uuid)); return Status::OK(); } diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 84d889a059ebc1..a58ce5dbc7a3e8 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -222,6 +222,8 @@ CONF_mBool(enable_distinguish_hdfs_path, "true"); // If enabled, the txn status will be checked when preapre/commit rowset CONF_mBool(enable_load_txn_status_check, "true"); +CONF_mBool(enable_tablet_job_check, "true"); + // Declare a selection strategy for those servers have many ips. // Note that there should at most one ip match this list. // this is a list in semicolon-delimited format, in CIDR notation, diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index a280bc5330334e..211435de7975b1 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -991,6 +992,60 @@ static void fill_schema_from_dict(MetaServiceCode& code, std::string& msg, existed_rowset_meta->CopyFrom(metas.Get(0)); } +bool check_job_existed(Transaction* txn, MetaServiceCode& code, std::string& msg, + const std::string& instance_id, int64_t tablet_id, + const std::string& rowset_id, const std::string& job_id) { + TabletIndexPB tablet_idx; + get_tablet_idx(code, msg, txn, instance_id, tablet_id, tablet_idx); + if (code != MetaServiceCode::OK) { + return false; + } + + std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(), + tablet_idx.partition_id(), tablet_id}); + std::string job_val; + auto err = txn->get(job_key, &job_val); + if (err != TxnErrorCode::TXN_OK) { + std::stringstream ss; + ss << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,") + << " instance_id=" << instance_id << " tablet_id=" << tablet_id + << " rowset_id=" << rowset_id << " err=" << err; + msg = ss.str(); + code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::STALE_PREPARE_ROWSET + : cast_as(err); + return false; + } + + TabletJobInfoPB job_pb; + job_pb.ParseFromString(job_val); + bool match = false; + if (!job_pb.compaction().empty()) { + for (auto c : job_pb.compaction()) { + if (c.id() == job_id) { + match = true; + } + } + } + + if (job_pb.has_schema_change()) { + if (job_pb.schema_change().id() == job_id) { + match = true; + } + } + + if (!match) { + std::stringstream ss; + ss << " stale perpare rowset request," + << " instance_id=" << instance_id << " tablet_id=" << tablet_id << " job id=" << job_id + << " rowset_id=" << rowset_id; + msg = ss.str(); + code = MetaServiceCode::STALE_PREPARE_ROWSET; + return false; + } + + return true; +} + /** * Check if the transaction status is as expected. * If the transaction is not in the expected state, return false and set the error code and message. @@ -1100,6 +1155,15 @@ void MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll return; } + // Check if the compaction/sc tablet job has finished + if (config::enable_tablet_job_check && request->has_tablet_job_id() && + !request->tablet_job_id().empty()) { + if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, rowset_id, + request->tablet_job_id())) { + return; + } + } + // Check if the prepare rowset request is invalid. // If the transaction has been finished, it means this prepare rowset is a timeout retry request. // In this case, do not write the recycle key again, otherwise it may cause data loss. @@ -1237,6 +1301,15 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle return; } + // Check if the compaction/sc tablet job has finished + if (config::enable_tablet_job_check && request->has_tablet_job_id() && + !request->tablet_job_id().empty()) { + if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, rowset_id, + request->tablet_job_id())) { + return; + } + } + // Check if the commit rowset request is invalid. // If the transaction has been finished, it means this commit rowset is a timeout retry request. // In this case, do not write the recycle key again, otherwise it may cause data loss. @@ -3107,4 +3180,4 @@ void MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* control response->mutable_schema_dict()->Swap(&schema_dict); } -} // namespace doris::cloud \ No newline at end of file +} // namespace doris::cloud diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 3db5b6f125c9df..731bc8d78bf6bc 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -320,6 +320,32 @@ static void add_tablet_metas(MetaServiceProxy* meta_service, std::string instanc ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); } +static void start_compaction_job(MetaService* meta_service, int64_t tablet_id, + const std::string& job_id, const std::string& initiator, + int base_compaction_cnt, int cumu_compaction_cnt, + TabletCompactionJobPB::CompactionType type, + StartTabletJobResponse& res, + std::pair input_version = {0, 0}) { + brpc::Controller cntl; + StartTabletJobRequest req; + req.mutable_job()->mutable_idx()->set_tablet_id(tablet_id); + auto compaction = req.mutable_job()->add_compaction(); + compaction->set_id(job_id); + compaction->set_initiator(initiator); + compaction->set_base_compaction_cnt(base_compaction_cnt); + compaction->set_cumulative_compaction_cnt(cumu_compaction_cnt); + compaction->set_type(type); + long now = time(nullptr); + compaction->set_expiration(now + 12); + compaction->set_lease(now + 3); + if (input_version.second > 0) { + compaction->add_input_versions(input_version.first); + compaction->add_input_versions(input_version.second); + compaction->set_check_input_versions_range(true); + } + meta_service->start_tablet_job(&cntl, &req, &res, nullptr); +}; + TEST(MetaServiceTest, GetInstanceIdTest) { extern std::string get_instance_id(const std::shared_ptr& rc_mgr, const std::string& cloud_unique_id); @@ -9359,6 +9385,317 @@ TEST(MetaServiceTest, AddObjInfoWithRole) { SyncPoint::get_instance()->clear_all_call_backs(); } +TEST(MetaServiceTest, CheckJobExisted) { + auto meta_service = get_meta_service(); + + std::string instance_id = "check_job_existed_instance_id"; + auto sp = SyncPoint::get_instance(); + std::unique_ptr> defer( + (int*)0x01, [](int*) { SyncPoint::get_instance()->clear_all_call_backs(); }); + sp->set_call_back("get_instance_id", [&](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = instance_id; + ret->second = true; + }); + sp->enable_processing(); + + // OK + { + constexpr auto table_id = 952701, index_id = 952702, partition_id = 952703, + tablet_id = 952704; + int64_t txn_id = 952705; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage(arena); + req->set_tablet_job_id("compaction1"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } + + // job does not exist, + { + constexpr auto table_id = 952801, index_id = 952802, partition_id = 952803, + tablet_id = 952804; + int64_t txn_id = 952805; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage(arena); + req->set_tablet_job_id("compaction1"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) << res.status().msg(); + res.Clear(); + } + + // compaction job exists, job id not match + { + constexpr auto table_id = 952901, index_id = 952902, partition_id = 952903, + tablet_id = 952904; + int64_t txn_id = 952905; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage(arena); + req->set_tablet_job_id("compaction2"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) << res.status().msg(); + res.Clear(); + } + + // do not set job id + { + constexpr auto table_id = 953501, index_id = 953502, partition_id = 953503, + tablet_id = 953504; + int64_t txn_id = 953505; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage(arena); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } + + // job id is empty string + { + constexpr auto table_id = 953601, index_id = 953602, partition_id = 953603, + tablet_id = 953604; + int64_t txn_id = 953605; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage(arena); + req->set_tablet_job_id(""); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->prepare_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } + + // commit rowset OK + { + constexpr auto table_id = 953001, index_id = 953002, partition_id = 953003, + tablet_id = 953004; + int64_t txn_id = 953005; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage(arena); + req->set_tablet_job_id("compaction1"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } + + // commit rowset, job does not exist, + { + constexpr auto table_id = 953101, index_id = 953102, partition_id = 953103, + tablet_id = 953104; + int64_t txn_id = 952805; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage(arena); + req->set_tablet_job_id("compaction1"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) << res.status().msg(); + res.Clear(); + } + + // commit rowset, compaction job exists, job id not match + { + constexpr auto table_id = 953201, index_id = 953202, partition_id = 953203, + tablet_id = 953204; + int64_t txn_id = 952905; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage(arena); + req->set_tablet_job_id("compaction2"); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::STALE_PREPARE_ROWSET) << res.status().msg(); + res.Clear(); + } + + // do not set job id when commit rowset + { + constexpr auto table_id = 953301, index_id = 953302, partition_id = 953303, + tablet_id = 953304; + int64_t txn_id = 953305; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage(arena); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } + + // job id is empty string when commit rowset + { + constexpr auto table_id = 953401, index_id = 953402, partition_id = 953403, + tablet_id = 953404; + int64_t txn_id = 953405; + std::string label = "update_rowset_meta_test_label1"; + CreateRowsetResponse res; + + ASSERT_NO_FATAL_FAILURE( + create_tablet(meta_service.get(), table_id, index_id, partition_id, tablet_id)); + + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + + { + StartTabletJobResponse res; + start_compaction_job(meta_service.get(), tablet_id, "compaction1", "ip:port", 0, 0, + TabletCompactionJobPB::BASE, res); + } + + brpc::Controller cntl; + auto arena = res.GetArena(); + auto req = google::protobuf::Arena::CreateMessage(arena); + req->set_tablet_job_id(""); + req->mutable_rowset_meta()->CopyFrom(rowset); + meta_service->commit_rowset(&cntl, req, &res, nullptr); + if (!arena) delete req; + + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << res.status().msg(); + res.Clear(); + } +} + TEST(MetaServiceTest, StalePrepareRowset) { auto meta_service = get_meta_service(); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 081a504e928b95..00681b7688c6bf 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -977,6 +977,7 @@ message CreateRowsetRequest { optional doris.RowsetMetaCloudPB rowset_meta = 2; optional bool temporary = 3; optional int64 txn_id = 4; + optional string tablet_job_id = 5; } message CreateRowsetResponse { @@ -1377,6 +1378,7 @@ enum MetaServiceCode { VERSION_NOT_FOUND = 2010; TABLET_NOT_FOUND = 2011; STALE_TABLET_CACHE = 2012; + STALE_PREPARE_ROWSET = 2013; CLUSTER_NOT_FOUND = 3001; ALREADY_EXISTED = 3002;