Skip to content

Commit

Permalink
[fix](merge-on-write) fix dead lock when publish
Browse files Browse the repository at this point in the history
  • Loading branch information
liaoxin01 committed Jun 29, 2023
1 parent f5668ac commit a0f64eb
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 37 deletions.
37 changes: 7 additions & 30 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ EnginePublishVersionTask::EnginePublishVersionTask(
const TPublishVersionRequest& publish_version_req, std::vector<TTabletId>* error_tablet_ids,
std::vector<TTabletId>* succ_tablet_ids,
std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets)
: _total_task_num(0),
_publish_version_req(publish_version_req),
: _publish_version_req(publish_version_req),
_error_tablet_ids(error_tablet_ids),
_succ_tablet_ids(succ_tablet_ids),
_discontinuous_version_tablets(discontinuous_version_tablets) {}
Expand All @@ -88,25 +87,14 @@ void EnginePublishVersionTask::add_succ_tablet_id(int64_t tablet_id) {
_succ_tablet_ids->push_back(tablet_id);
}

void EnginePublishVersionTask::wait() {
std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
_tablet_finish_cond.wait(lock);
}

void EnginePublishVersionTask::notify() {
std::unique_lock<std::mutex> lock(_tablet_finish_mutex);
_tablet_finish_cond.notify_one();
}

int64_t EnginePublishVersionTask::finish_task() {
return _total_task_num.fetch_sub(1);
}

Status EnginePublishVersionTask::finish() {
Status res = Status::OK();
int64_t transaction_id = _publish_version_req.transaction_id;
OlapStopWatch watch;
VLOG_NOTICE << "begin to process publish version. transaction_id=" << transaction_id;
std::unique_ptr<ThreadPoolToken> token =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->new_token(
ThreadPool::ExecutionMode::CONCURRENT);

// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
Expand Down Expand Up @@ -187,19 +175,13 @@ Status EnginePublishVersionTask::finish() {
continue;
}
}
_total_task_num.fetch_add(1);
auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id, version, tablet_info);
auto submit_st =
StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
[=]() { tablet_publish_txn_ptr->handle(); });
auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); });
CHECK(submit_st.ok()) << submit_st;
}
}
// wait for all publish txn finished
while (_total_task_num.load() != 0) {
wait();
}
token->wait();

// check if the related tablet remained all have the version
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
Expand Down Expand Up @@ -260,12 +242,7 @@ void TabletPublishTxnTask::handle() {
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}
Defer defer {[&] {
_rowset->finish_publish();
if (_engine_publish_version_task->finish_task() == 1) {
_engine_publish_version_task->notify();
}
}};
Defer defer {[&] { _rowset->finish_publish(); }};
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
_partition_id, _tablet, _transaction_id, _version, &_stats);
if (publish_status != Status::OK()) {
Expand Down
7 changes: 0 additions & 7 deletions be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,14 @@ class EnginePublishVersionTask : public EngineTask {
void add_error_tablet_id(int64_t tablet_id);
void add_succ_tablet_id(int64_t tablet_id);

void notify();
void wait();

int64_t finish_task();

private:
std::atomic<int64_t> _total_task_num;
const TPublishVersionRequest& _publish_version_req;
std::mutex _tablet_ids_mutex;
vector<TTabletId>* _error_tablet_ids;
vector<TTabletId>* _succ_tablet_ids;
std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets;

std::mutex _tablet_finish_mutex;
std::condition_variable _tablet_finish_cond;
};

class AsyncTabletPublishTask {
Expand Down

0 comments on commit a0f64eb

Please sign in to comment.