Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 73 additions & 61 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1408,77 +1408,89 @@ void MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
return;
}

bool need_commit = false;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
return;
}
for (int retry = 0; retry <= 1; retry++) {
bool need_commit = false;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::CREATE>(err);
msg = "failed to create txn";
return;
}

int64_t tablet_id = request->job().idx().tablet_id();
if (tablet_id <= 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid tablet_id given";
return;
}
auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
!tablet_idx.has_partition_id()) {
get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, tablet_idx);
if (code != MetaServiceCode::OK) return;
}
// Check if tablet has been dropped
if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
tablet_idx.partition_id())) {
code = MetaServiceCode::TABLET_NOT_FOUND;
msg = fmt::format("tablet {} has been dropped", tablet_id);
return;
}
int64_t tablet_id = request->job().idx().tablet_id();
if (tablet_id <= 0) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "no valid tablet_id given";
return;
}
auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
!tablet_idx.has_partition_id()) {
get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id, tablet_idx);
if (code != MetaServiceCode::OK) return;
}
// Check if tablet has been dropped
if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
tablet_idx.partition_id())) {
code = MetaServiceCode::TABLET_NOT_FOUND;
msg = fmt::format("tablet {} has been dropped", tablet_id);
return;
}

// TODO(gavin): remove duplicated code with start_tablet_job()
// Begin to process finish tablet job
std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_id});
std::string job_val;
err = txn->get(job_key, &job_val);
if (err != TxnErrorCode::TXN_OK) {
SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,")
<< " instance_id=" << instance_id << " tablet_id=" << tablet_id
<< " job=" << proto_to_json(request->job()) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT
: cast_as<ErrCategory::READ>(err);
return;
}
TabletJobInfoPB recorded_job;
recorded_job.ParseFromString(job_val);
VLOG_DEBUG << "get tablet job, tablet_id=" << tablet_id
<< " job=" << proto_to_json(recorded_job);
// TODO(gavin): remove duplicated code with start_tablet_job()
// Begin to process finish tablet job
std::string job_key =
job_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_id});
std::string job_val;
err = txn->get(job_key, &job_val);
if (err != TxnErrorCode::TXN_OK) {
SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," : "internal error,")
<< " instance_id=" << instance_id << " tablet_id=" << tablet_id
<< " job=" << proto_to_json(request->job()) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::INVALID_ARGUMENT
: cast_as<ErrCategory::READ>(err);
return;
}
TabletJobInfoPB recorded_job;
recorded_job.ParseFromString(job_val);
VLOG_DEBUG << "get tablet job, tablet_id=" << tablet_id
<< " job=" << proto_to_json(recorded_job);

if (!request->job().compaction().empty()) {
// Process compaction commit
process_compaction_job(code, msg, ss, txn, request, response, recorded_job, instance_id,
job_key, need_commit);
} else if (request->job().has_schema_change()) {
// Process schema change commit
process_schema_change_job(code, msg, ss, txn, request, response, recorded_job,
instance_id, job_key, need_commit);
}

DORIS_CLOUD_DEFER {
if (!need_commit) return;
TxnErrorCode err = txn->commit();
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_CONFLICT) {
if (retry == 0 && !request->job().compaction().empty() &&
request->job().compaction(0).has_delete_bitmap_lock_initiator()) {
// Do a fast retry for mow when commit compaction job.
// The only fdb txn conflict here is that during the compaction job commit,
// a compaction lease RPC comes and finishes before the commit,
// so we retry to commit the compaction job again.
response->Clear();
code = MetaServiceCode::OK;
msg.clear();
continue;
}
}

code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit job kv, err=" << err;
msg = ss.str();
return;
}
};

// Process compaction commit
if (!request->job().compaction().empty()) {
process_compaction_job(code, msg, ss, txn, request, response, recorded_job, instance_id,
job_key, need_commit);
return;
}

// Process schema change commit
if (request->job().has_schema_change()) {
process_schema_change_job(code, msg, ss, txn, request, response, recorded_job, instance_id,
job_key, need_commit);
return;
break;
}
}

Expand Down
Loading