Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor task runner for more robust runtime state #1318

Merged
merged 5 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions src/common/task_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,21 @@ Status TaskRunner::Publish(const Task &task) {
}

task_queue_.emplace_back(task);
cond_.notify_all();
cond_.notify_one();
return Status::OK();
}

void TaskRunner::Start() {
Status TaskRunner::Start() {
if (!threads_.empty()) {
return {Status::NotOK, "Task runner is expected to stop before starting"};
}

stop_ = false;
for (int i = 0; i < n_thread_; i++) {
threads_.emplace_back([this] {
Util::ThreadSetName("task-runner");
this->run();
});
threads_.emplace_back(GET_OR_RET(Util::CreateThread("task-runner", [this] { this->run(); })));
}

return Status::OK();
}

void TaskRunner::Stop() {
Expand All @@ -55,22 +58,22 @@ void TaskRunner::Stop() {
cond_.notify_all();
}

void TaskRunner::Join() {
Status TaskRunner::Join() {
for (auto &thread : threads_) {
if (auto s = Util::ThreadJoin(thread); !s) {
LOG(WARNING) << "Task thread operation failed: " << s.Msg();
return s.Prefixed("Task thread operation failed");
}
}
}

void TaskRunner::Purge() {
std::lock_guard<std::mutex> guard(mu_);
threads_.clear();
task_queue_.clear();

return Status::OK();
}

void TaskRunner::run() {
std::unique_lock<std::mutex> lock(mu_);

while (!stop_) {
cond_.wait(lock, [this]() -> bool { return stop_ || !task_queue_.empty(); });

Expand All @@ -84,6 +87,5 @@ void TaskRunner::run() {
}

task_queue_.clear();
lock.unlock();
// CAUTION: drop the rest of tasks, don't use task runner if the task can't be drop
}
9 changes: 4 additions & 5 deletions src/common/task_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

#include <condition_variable>
#include <cstdint>
#include <deque>
#include <functional>
#include <list>
#include <mutex>
#include <thread>
#include <vector>
Expand All @@ -40,17 +40,16 @@ class TaskRunner {

Status Publish(const Task &task);
size_t QueueSize() { return task_queue_.size(); }
void Start();
Status Start();
void Stop();
void Join();
void Purge();
Status Join();

private:
void run();

bool stop_ = false;
uint32_t max_queue_size_;
std::list<Task> task_queue_;
std::deque<Task> task_queue_;
std::mutex mu_;
std::condition_variable cond_;
int n_thread_;
Expand Down
17 changes: 12 additions & 5 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ Status Server::Start() {
worker->Start();
}

task_runner_.Start();
if (auto s = task_runner_.Start(); !s) {
return s.Prefixed("Failed to start task runner");
}
// setup server cron thread
cron_thread_ = GET_OR_RET(Util::CreateThread("server-cron", [this] { this->cron(); }));

Expand Down Expand Up @@ -230,7 +232,9 @@ void Server::Join() {
worker->Join();
}

task_runner_.Join();
if (auto s = task_runner_.Join(); !s) {
LOG(WARNING) << s.Msg();
}
if (auto s = Util::ThreadJoin(cron_thread_); !s) {
LOG(WARNING) << "Cron thread operation failed: " << s.Msg();
}
Expand Down Expand Up @@ -262,7 +266,9 @@ Status Server::AddMaster(const std::string &host, uint32_t port, bool force_reco
auto s = replication_thread_->Start([this]() { PrepareRestoreDB(); },
[this]() {
this->is_loading_ = false;
task_runner_.Start();
if (auto s = task_runner_.Start(); !s) {
LOG(WARNING) << "Failed to start task runner: " << s.Msg();
}
});
if (s.IsOK()) {
master_host_ = host;
Expand Down Expand Up @@ -1189,8 +1195,9 @@ void Server::PrepareRestoreDB() {
// Stop task runner
LOG(INFO) << "[server] Stopping the task runner and clear task queue...";
task_runner_.Stop();
task_runner_.Join();
task_runner_.Purge();
if (auto s = task_runner_.Join(); !s) {
LOG(WARNING) << "[server] " << s.Msg();
}

// If the DB is restored, the object 'db_' will be destroyed, but
// 'db_' will be accessed in data migration task. To avoid wrong
Expand Down
10 changes: 4 additions & 6 deletions tests/cppunit/task_runner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,15 @@ TEST(TaskRunner, PublishToStopQueue) {
TEST(TaskRunner, Run) {
std::atomic<int> counter = {0};
TaskRunner tr(3, 1024);
tr.Start();
auto _ = tr.Start();

Status s;
Task t;
for (int i = 0; i < 100; i++) {
t = [&counter] { counter.fetch_add(1); };
s = tr.Publish(t);
Task t = [&counter] { counter.fetch_add(1); };
auto s = tr.Publish(t);
ASSERT_TRUE(s.IsOK());
}
sleep(1);
ASSERT_EQ(100, counter);
tr.Stop();
tr.Join();
_ = tr.Join();
}