Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delete_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, const TPushReq& requ
RETURN_IF_ERROR(rowset_writer->build(rowset));
rowset->rowset_meta()->set_delete_predicate(std::move(del_pred));

auto st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta());
auto st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "");

// Update tablet stats
tablet->fetch_add_approximate_num_rowsets(1);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Status CloudDeltaWriter::commit_rowset() {
RETURN_IF_ERROR(_rowset_builder->init());
RETURN_IF_ERROR(_rowset_builder->build_rowset());
}
return _engine.meta_mgr().commit_rowset(*rowset_meta());
return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
}

Status CloudDeltaWriter::set_txn_related_delete_bitmap() {
Expand Down
6 changes: 4 additions & 2 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
return Status::OK();
}

Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta,
Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
Expand All @@ -879,6 +879,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta,
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_txn_id(rs_meta.txn_id());
req.set_tablet_job_id(job_id);

RowsetMetaPB doris_rs_meta = rs_meta.get_rowset_pb(/*skip_schema=*/true);
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(doris_rs_meta));
Expand All @@ -896,7 +897,7 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta,
return st;
}

Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta,
Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
Expand All @@ -909,6 +910,7 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta,
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_txn_id(rs_meta.txn_id());
req.set_tablet_job_id(job_id);

RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb();
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb));
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ class CloudMetaMgr {
bool warmup_delta_data = false, bool sync_delete_bitmap = true, bool full_sync = false,
SyncRowsetStats* sync_stats = nullptr);

Status prepare_rowset(const RowsetMeta& rs_meta,
Status prepare_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);

Status commit_rowset(const RowsetMeta& rs_meta,
Status commit_rowset(const RowsetMeta& rs_meta, const std::string& job_id,
std::shared_ptr<RowsetMeta>* existed_rs_meta = nullptr);

Status update_tmp_rowset(const RowsetMeta& rs_meta);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_rowset_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Status CloudRowsetBuilder::init() {

_calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor()->create_token();

RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta()));
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_rowset_writer->rowset_meta(), ""));

_is_init = true;
return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_schema_change_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam

RowsetMetaSharedPtr existed_rs_meta;
auto st = _cloud_storage_engine.meta_mgr().prepare_rowset(*rowset_writer->rowset_meta(),
&existed_rs_meta);
_job_id, &existed_rs_meta);
if (!st.ok()) {
if (st.is<ALREADY_EXIST>()) {
LOG(INFO) << "Rowset " << rs_reader->version() << " has already existed in tablet "
Expand Down Expand Up @@ -325,7 +325,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
st.to_string());
}

st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(),
st = _cloud_storage_engine.meta_mgr().commit_rowset(*rowset_writer->rowset_meta(), _job_id,
&existed_rs_meta);
if (!st.ok()) {
if (st.is<ALREADY_EXIST>()) {
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1352,7 +1352,7 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {
<< _output_rowset->rowset_meta()->rowset_id().to_string();
})

RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get()));
RETURN_IF_ERROR(_engine.meta_mgr().commit_rowset(*_output_rowset->rowset_meta().get(), _uuid));

// 4. modify rowsets in memory
RETURN_IF_ERROR(modify_rowsets());
Expand Down Expand Up @@ -1433,7 +1433,8 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
compaction_type() == ReaderType::READER_BASE_COMPACTION);
ctx.file_cache_ttl_sec = _tablet->ttl_seconds();
_output_rs_writer = DORIS_TRY(_tablet->create_rowset_writer(ctx, _is_vertical));
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get()));
RETURN_IF_ERROR(
_engine.meta_mgr().prepare_rowset(*_output_rs_writer->rowset_meta().get(), _uuid));
return Status::OK();
}

Expand Down
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ CONF_mBool(enable_distinguish_hdfs_path, "true");
// If enabled, the txn status will be checked when preapre/commit rowset
CONF_mBool(enable_load_txn_status_check, "true");

CONF_mBool(enable_tablet_job_check, "true");

// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
// this is a list in semicolon-delimited format, in CIDR notation,
Expand Down
75 changes: 74 additions & 1 deletion cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <ios>
#include <limits>
#include <memory>
#include <ostream>
#include <sstream>
#include <string>
#include <tuple>
Expand Down Expand Up @@ -991,6 +992,60 @@ static void fill_schema_from_dict(MetaServiceCode& code, std::string& msg,
existed_rowset_meta->CopyFrom(metas.Get(0));
}

bool check_job_existed(Transaction* txn, MetaServiceCode& code, std::string& msg,
const std::string& instance_id, int64_t tablet_id,
const std::string& rowset_id, const std::string& job_id) {
TabletIndexPB tablet_idx;
get_tablet_idx(code, msg, txn, instance_id, tablet_id, tablet_idx);
if (code != MetaServiceCode::OK) {
return false;
}

std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_id});
std::string job_val;
auto err = txn->get(job_key, &job_val);
if (err != TxnErrorCode::TXN_OK) {
std::stringstream ss;
ss << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,")
<< " instance_id=" << instance_id << " tablet_id=" << tablet_id
<< " rowset_id=" << rowset_id << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::STALE_PREPARE_ROWSET
: cast_as<ErrCategory::READ>(err);
return false;
}

TabletJobInfoPB job_pb;
job_pb.ParseFromString(job_val);
bool match = false;
if (!job_pb.compaction().empty()) {
for (auto c : job_pb.compaction()) {
if (c.id() == job_id) {
match = true;
}
}
}

if (job_pb.has_schema_change()) {
if (job_pb.schema_change().id() == job_id) {
match = true;
}
}

if (!match) {
std::stringstream ss;
ss << " stale perpare rowset request,"
<< " instance_id=" << instance_id << " tablet_id=" << tablet_id << " job id=" << job_id
<< " rowset_id=" << rowset_id;
msg = ss.str();
code = MetaServiceCode::STALE_PREPARE_ROWSET;
return false;
}

return true;
}

/**
* Check if the transaction status is as expected.
* If the transaction is not in the expected state, return false and set the error code and message.
Expand Down Expand Up @@ -1100,6 +1155,15 @@ void MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
return;
}

// Check if the compaction/sc tablet job has finished
if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
!request->tablet_job_id().empty()) {
if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, rowset_id,
request->tablet_job_id())) {
return;
}
}

// Check if the prepare rowset request is invalid.
// If the transaction has been finished, it means this prepare rowset is a timeout retry request.
// In this case, do not write the recycle key again, otherwise it may cause data loss.
Expand Down Expand Up @@ -1237,6 +1301,15 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
return;
}

// Check if the compaction/sc tablet job has finished
if (config::enable_tablet_job_check && request->has_tablet_job_id() &&
!request->tablet_job_id().empty()) {
if (!check_job_existed(txn.get(), code, msg, instance_id, tablet_id, rowset_id,
request->tablet_job_id())) {
return;
}
}

// Check if the commit rowset request is invalid.
// If the transaction has been finished, it means this commit rowset is a timeout retry request.
// In this case, do not write the recycle key again, otherwise it may cause data loss.
Expand Down Expand Up @@ -3107,4 +3180,4 @@ void MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* control
response->mutable_schema_dict()->Swap(&schema_dict);
}

} // namespace doris::cloud
} // namespace doris::cloud
Loading
Loading