diff --git a/src/common/task_runner.cc b/src/common/task_runner.cc index 6eccfd564ea..123d1aa1228 100644 --- a/src/common/task_runner.cc +++ b/src/common/task_runner.cc @@ -35,18 +35,21 @@ Status TaskRunner::Publish(const Task &task) { } task_queue_.emplace_back(task); - cond_.notify_all(); + cond_.notify_one(); return Status::OK(); } -void TaskRunner::Start() { +Status TaskRunner::Start() { + if (!threads_.empty()) { + return {Status::NotOK, "Task runner is expected to stop before starting"}; + } + stop_ = false; for (int i = 0; i < n_thread_; i++) { - threads_.emplace_back([this] { - Util::ThreadSetName("task-runner"); - this->run(); - }); + threads_.emplace_back(GET_OR_RET(Util::CreateThread("task-runner", [this] { this->run(); }))); } + + return Status::OK(); } void TaskRunner::Stop() { @@ -55,22 +58,22 @@ void TaskRunner::Stop() { cond_.notify_all(); } -void TaskRunner::Join() { +Status TaskRunner::Join() { for (auto &thread : threads_) { if (auto s = Util::ThreadJoin(thread); !s) { - LOG(WARNING) << "Task thread operation failed: " << s.Msg(); + return s.Prefixed("Task thread operation failed"); } } -} -void TaskRunner::Purge() { std::lock_guard guard(mu_); threads_.clear(); - task_queue_.clear(); + + return Status::OK(); } void TaskRunner::run() { std::unique_lock lock(mu_); + while (!stop_) { cond_.wait(lock, [this]() -> bool { return stop_ || !task_queue_.empty(); }); @@ -84,6 +87,5 @@ void TaskRunner::run() { } task_queue_.clear(); - lock.unlock(); // CAUTION: drop the rest of tasks, don't use task runner if the task can't be drop } diff --git a/src/common/task_runner.h b/src/common/task_runner.h index 7737315493e..3aaf31ce9ac 100644 --- a/src/common/task_runner.h +++ b/src/common/task_runner.h @@ -22,8 +22,8 @@ #include #include +#include #include -#include #include #include #include @@ -40,17 +40,16 @@ class TaskRunner { Status Publish(const Task &task); size_t QueueSize() { return task_queue_.size(); } - void Start(); + Status Start(); void Stop(); - void Join(); - void Purge(); + Status Join(); private: void run(); bool stop_ = false; uint32_t max_queue_size_; - std::list task_queue_; + std::deque task_queue_; std::mutex mu_; std::condition_variable cond_; int n_thread_; diff --git a/src/server/server.cc b/src/server/server.cc index 34025ddbfcb..cd7681d73b7 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -165,7 +165,9 @@ Status Server::Start() { worker->Start(); } - task_runner_.Start(); + if (auto s = task_runner_.Start(); !s) { + return s.Prefixed("Failed to start task runner"); + } // setup server cron thread cron_thread_ = GET_OR_RET(Util::CreateThread("server-cron", [this] { this->cron(); })); @@ -230,7 +232,9 @@ void Server::Join() { worker->Join(); } - task_runner_.Join(); + if (auto s = task_runner_.Join(); !s) { + LOG(WARNING) << s.Msg(); + } if (auto s = Util::ThreadJoin(cron_thread_); !s) { LOG(WARNING) << "Cron thread operation failed: " << s.Msg(); } @@ -262,7 +266,9 @@ Status Server::AddMaster(const std::string &host, uint32_t port, bool force_reco auto s = replication_thread_->Start([this]() { PrepareRestoreDB(); }, [this]() { this->is_loading_ = false; - task_runner_.Start(); + if (auto s = task_runner_.Start(); !s) { + LOG(WARNING) << "Failed to start task runner: " << s.Msg(); + } }); if (s.IsOK()) { master_host_ = host; @@ -1189,8 +1195,9 @@ void Server::PrepareRestoreDB() { // Stop task runner LOG(INFO) << "[server] Stopping the task runner and clear task queue..."; task_runner_.Stop(); - task_runner_.Join(); - task_runner_.Purge(); + if (auto s = task_runner_.Join(); !s) { + LOG(WARNING) << "[server] " << s.Msg(); + } // If the DB is restored, the object 'db_' will be destroyed, but // 'db_' will be accessed in data migration task. To avoid wrong diff --git a/tests/cppunit/task_runner_test.cc b/tests/cppunit/task_runner_test.cc index 48eb7923420..ba0156e57dd 100644 --- a/tests/cppunit/task_runner_test.cc +++ b/tests/cppunit/task_runner_test.cc @@ -53,17 +53,15 @@ TEST(TaskRunner, PublishToStopQueue) { TEST(TaskRunner, Run) { std::atomic counter = {0}; TaskRunner tr(3, 1024); - tr.Start(); + auto _ = tr.Start(); - Status s; - Task t; for (int i = 0; i < 100; i++) { - t = [&counter] { counter.fetch_add(1); }; - s = tr.Publish(t); + Task t = [&counter] { counter.fetch_add(1); }; + auto s = tr.Publish(t); ASSERT_TRUE(s.IsOK()); } sleep(1); ASSERT_EQ(100, counter); tr.Stop(); - tr.Join(); + _ = tr.Join(); }