Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 69 additions & 65 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,54 +70,14 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
for (const auto& partition : _cal_delete_bitmap_req.partitions) {
int64_t version = partition.version;
for (auto tablet_id : partition.tablet_ids) {
int64_t t1 = MonotonicMicros();
auto base_tablet = DORIS_TRY(_engine.get_tablet(tablet_id));
auto get_tablet_time_us = MonotonicMicros() - t1;
std::shared_ptr<CloudTablet> tablet =
std::dynamic_pointer_cast<CloudTablet>(base_tablet);
if (tablet == nullptr) {
LOG(WARNING) << "can't get tablet when calculate delete bitmap. tablet_id="
<< tablet_id;
_error_tablet_ids->push_back(tablet_id);
_res = Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>(
"can't get tablet when calculate delete bitmap. tablet_id={}", tablet_id);
break;
}
int64_t t2 = MonotonicMicros();
Status st = tablet->sync_rowsets();
auto sync_rowset_time_us = MonotonicMicros() - t2;
if (!st.ok() && !st.is<ErrorCode::INVALID_TABLET_STATE>()) {
return st;
}
if (st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
add_succ_tablet_id(tablet->tablet_id());
LOG(INFO)
<< "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< tablet->tablet_id() << " txn_id: " << transaction_id
<< ", request_version=" << version;
continue;
}
int64_t max_version = tablet->max_version_unlocked();
if (version != max_version + 1) {
_error_tablet_ids->push_back(tablet_id);
_res = Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"version not continuous");
LOG(WARNING) << "version not continuous, current max version=" << max_version
<< ", request_version=" << version
<< " tablet_id=" << tablet->tablet_id();
break;
}
LOG(INFO) << "calculate delete bitmap on tablet"
<< ", table_id=" << tablet->table_id()
<< ", transaction_id=" << transaction_id
<< ", tablet_id=" << tablet->tablet_id()
<< ", get_tablet_time_us=" << get_tablet_time_us
<< ", sync_rowset_time_us=" << sync_rowset_time_us;

auto tablet_calc_delete_bitmap_ptr = std::make_shared<CloudTabletCalcDeleteBitmapTask>(
_engine, this, tablet, transaction_id, version);
auto submit_st = token->submit_func([=]() { tablet_calc_delete_bitmap_ptr->handle(); });
_engine, this, tablet_id, transaction_id, version);
auto submit_st = token->submit_func([=]() {
auto st = tablet_calc_delete_bitmap_ptr->handle();
if (!st.ok()) {
LOG(WARNING) << "handle calc delete bitmap fail, st=" << st.to_string();
}
});
if (!submit_st.ok()) {
_res = submit_st;
break;
Expand All @@ -135,33 +95,74 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
}

CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask* engine_task,
std::shared_ptr<CloudTablet> tablet, int64_t transaction_id, int64_t version)
CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask* engine_task, int64_t tablet_id,
int64_t transaction_id, int64_t version)
: _engine(engine),
_engine_calc_delete_bitmap_task(engine_task),
_tablet(tablet),
_tablet_id(tablet_id),
_transaction_id(transaction_id),
_version(version) {
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
fmt::format("CloudTabletCalcDeleteBitmapTask#_transaction_id={}", _transaction_id));
}

void CloudTabletCalcDeleteBitmapTask::handle() const {
Status CloudTabletCalcDeleteBitmapTask::handle() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'handle' exceeds recommended size/complexity thresholds [readability-function-size]

Status CloudTabletCalcDeleteBitmapTask::handle() const {
                                        ^
Additional context

be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp:109: 84 lines including whitespace and comments (threshold 80)

Status CloudTabletCalcDeleteBitmapTask::handle() const {
                                        ^

SCOPED_ATTACH_TASK(_mem_tracker);
int64_t t1 = MonotonicMicros();
auto base_tablet = DORIS_TRY(_engine.get_tablet(_tablet_id));
auto get_tablet_time_us = MonotonicMicros() - t1;
std::shared_ptr<CloudTablet> tablet = std::dynamic_pointer_cast<CloudTablet>(base_tablet);
if (tablet == nullptr) {
LOG(WARNING) << "can't get tablet when calculate delete bitmap. tablet_id=" << _tablet_id;
auto error_st = Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>(
"can't get tablet when calculate delete bitmap. tablet_id={}", _tablet_id);
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, error_st);
return error_st;
}
int64_t max_version = tablet->max_version_unlocked();
int64_t t2 = MonotonicMicros();
if (_version != max_version + 1) {
auto sync_st = tablet->sync_rowsets();
if (sync_st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
return sync_st;
}
if (!sync_st.ok()) {
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << sync_st;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st);
return sync_st;
}
}
auto sync_rowset_time_us = MonotonicMicros() - t2;
max_version = tablet->max_version_unlocked();
if (_version != max_version + 1) {
LOG(WARNING) << "version not continuous, current max version=" << max_version
<< ", request_version=" << _version << " tablet_id=" << _tablet_id;
auto error_st =
Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>("version not continuous");
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, error_st);
return error_st;
}

RowsetSharedPtr rowset;
DeleteBitmapPtr delete_bitmap;
RowsetIdUnorderedSet rowset_ids;
std::shared_ptr<PartialUpdateInfo> partial_update_info;
int64_t txn_expiration;
Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
_transaction_id, _tablet->tablet_id(), &rowset, &delete_bitmap, &rowset_ids,
&txn_expiration, &partial_update_info);
_transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration,
&partial_update_info);
if (status != Status::OK()) {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet->tablet_id()
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << status;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet->tablet_id(), status);
return;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, status);
return status;
}

int64_t t3 = MonotonicMicros();
Expand All @@ -171,22 +172,25 @@ void CloudTabletCalcDeleteBitmapTask::handle() const {
txn_info.delete_bitmap = delete_bitmap;
txn_info.rowset_ids = rowset_ids;
txn_info.partial_update_info = partial_update_info;
status = CloudTablet::update_delete_bitmap(_tablet, &txn_info, _transaction_id, txn_expiration);
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id, txn_expiration);
auto update_delete_bitmap_time_us = MonotonicMicros() - t3;
if (status != Status::OK()) {
LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << rowset->rowset_id()
<< ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id
<< ", tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id
<< ", status=" << status;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet->tablet_id(), status);
return;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, status);
return status;
}

_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet->tablet_id());
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "calculate delete bitmap successfully on tablet"
<< ", table_id=" << _tablet->table_id() << ", transaction_id=" << _transaction_id
<< ", tablet_id=" << _tablet->tablet_id() << ", num_rows=" << rowset->num_rows()
<< ", res=" << status
<< ", update_delete_bitmap_time_us=" << update_delete_bitmap_time_us;
<< ", table_id=" << tablet->table_id() << ", transaction_id=" << _transaction_id
<< ", tablet_id=" << tablet->tablet_id() << ", num_rows=" << rowset->num_rows()
<< ", get_tablet_time_us=" << get_tablet_time_us
<< ", sync_rowset_time_us=" << sync_rowset_time_us
<< ", update_delete_bitmap_time_us=" << update_delete_bitmap_time_us
<< ", res=" << status;
return status;
}

} // namespace doris
9 changes: 4 additions & 5 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,17 @@ class MemTrackerLimiter;
class CloudTabletCalcDeleteBitmapTask {
public:
CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine,
CloudEngineCalcDeleteBitmapTask* engine_task,
std::shared_ptr<CloudTablet> tablet, int64_t transaction_id,
int64_t version);
CloudEngineCalcDeleteBitmapTask* engine_task, int64_t tablet_id,
int64_t transaction_id, int64_t version);
~CloudTabletCalcDeleteBitmapTask() = default;

void handle() const;
Status handle() const;

private:
CloudStorageEngine& _engine;
CloudEngineCalcDeleteBitmapTask* _engine_calc_delete_bitmap_task;

std::shared_ptr<CloudTablet> _tablet;
int64_t _tablet_id;
int64_t _transaction_id;
int64_t _version;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
Expand Down