Skip to content

Commit

Permalink
[fix](merge-on-write) fix dead lock when publish (apache#21339)
Browse files Browse the repository at this point in the history
  • Loading branch information
liaoxin01 committed Sep 12, 2023
1 parent badbec6 commit e2659df
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 36 deletions.
35 changes: 6 additions & 29 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ using std::map;
EnginePublishVersionTask::EnginePublishVersionTask(TPublishVersionRequest& publish_version_req,
std::vector<TTabletId>* error_tablet_ids,
std::vector<TTabletId>* succ_tablet_ids)
: _total_task_num(0),
_publish_version_req(publish_version_req),
: _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids) {}

Expand All @@ -49,25 +48,14 @@ void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) {
_succ_tablet_ids->push_back(tablet_id);
}

void EnginePublishVersionTask::wait() {
std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
_tablet_finish_cond.wait(lock);
}

void EnginePublishVersionTask::notify() {
std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
_tablet_finish_cond.notify_one();
}

int64_t EnginePublishVersionTask::finish_task() {
return _total_task_num.fetch_sub(1);
}

Status EnginePublishVersionTask::finish() {
Status res = Status::OK();
int64_t transaction_id = _publish_version_req.transaction_id;
OlapStopWatch watch;
VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id;
std::unique_ptr<ThreadPoolToken> token =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);

// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
Expand Down Expand Up @@ -145,19 +133,13 @@ Status EnginePublishVersionTask::finish() {
continue;
}
}
_total_task_num.fetch_add(1);
auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id, version, tablet_info);
auto submit_st =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
[=]() { tablet_publish_txn_ptr->handle(); });
auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); });
CHECK(submit_st.ok()) << submit_st;
}
}
// wait for all publish txn finished
while (_total_task_num.load() != 0) {
wait();
}
token->wait();

// check if the related tablet remained all have the version
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
Expand Down Expand Up @@ -209,11 +191,6 @@ TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
_tablet_info(tablet_info) {}

void TabletPublishTxnTask::handle() {
Defer defer {[&] {
if (_engine_publish_version_task->finish_task() == 1) {
_engine_publish_version_task->notify();
}
}};
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
_partition_id, _tablet, _transaction_id, _version);
if (publish_status != Status::OK()) {
Expand Down
7 changes: 0 additions & 7 deletions be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,13 @@ class EnginePublishVersionTask : public EngineTask {
void add_error_tablet_id(int64_t tablet_id);
void add_succ_tablet_id(int64_t tablet_id);

void notify();
void wait();

int64_t finish_task();

private:
std::atomic<int64_t> _total_task_num;
const TPublishVersionRequest& _publish_version_req;
std::mutex _tablet_ids_mutex;
vector<TTabletId>* _error_tablet_ids;
vector<TTabletId>* _succ_tablet_ids;

std::mutex _tablet_finish_mutex;
std::condition_variable _tablet_finish_cond;
};

} // namespace doris
Expand Down

0 comments on commit e2659df

Please sign in to comment.