Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CreateThread to avoid manual try-catch #1302

Merged
merged 4 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 32 additions & 31 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,29 @@
#include "time_util.h"

Status FeedSlaveThread::Start() {
try {
t_ = std::thread([this]() {
Util::ThreadSetName("feed-replica");
sigset_t mask, omask;
sigemptyset(&mask);
sigemptyset(&omask);
sigaddset(&mask, SIGCHLD);
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &mask, &omask);
auto s = Util::SockSend(conn_->GetFD(), "+OK\r\n");
if (!s.IsOK()) {
LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg();
return;
}
this->loop();
});
} catch (const std::system_error &e) {
auto s = Util::CreateThread("feed-replica", [this] {
sigset_t mask, omask;
sigemptyset(&mask);
sigemptyset(&omask);
sigaddset(&mask, SIGCHLD);
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGPIPE);
pthread_sigmask(SIG_BLOCK, &mask, &omask);
auto s = Util::SockSend(conn_->GetFD(), "+OK\r\n");
if (!s.IsOK()) {
LOG(ERROR) << "failed to send OK response to the replica: " << s.Msg();
return;
}
this->loop();
});

if (s) {
t_ = std::move(*s);
} else {
conn_ = nullptr; // prevent connection was freed when failed to start the thread
return {Status::NotOK, e.what()};
}
return Status::OK();

return s;
}

void FeedSlaveThread::Stop() {
Expand All @@ -76,7 +77,9 @@ void FeedSlaveThread::Stop() {
}

void FeedSlaveThread::Join() {
if (t_.joinable()) t_.join();
if (auto s = Util::ThreadJoin(t_); !s) {
LOG(WARNING) << "Slave thread operation failed: " << s.Msg();
}
}

void FeedSlaveThread::checkLivenessIfNeed() {
Expand Down Expand Up @@ -323,15 +326,11 @@ Status ReplicationThread::Start(std::function<void()> &&pre_fullsync_cb, std::fu
// cleanup the old backups, so we can start replication in a clean state
storage_->PurgeOldBackups(0, 0);

try {
t_ = std::thread([this]() {
Util::ThreadSetName("master-repl");
this->run();
assert(stop_flag_);
});
} catch (const std::system_error &e) {
return Status(Status::NotOK, e.what());
}
t_ = GET_OR_RET(Util::CreateThread("master-repl", [this] {
this->run();
assert(stop_flag_);
}));

return Status::OK();
}

Expand All @@ -340,7 +339,9 @@ void ReplicationThread::Stop() {

stop_flag_ = true; // Stopping procedure is asynchronous,
// handled by timer
if (t_.joinable()) t_.join();
if (auto s = Util::ThreadJoin(t_); !s) {
LOG(WARNING) << "Replication thread operation failed: " << s.Msg();
}
LOG(INFO) << "[replication] Stopped";
}

Expand Down
17 changes: 7 additions & 10 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,17 @@ SlotMigrate::~SlotMigrate() {
stop_migrate_ = true;
thread_state_ = ThreadState::Terminated;
job_cv_.notify_all();
if (t_.joinable()) t_.join();
if (auto s = Util::ThreadJoin(t_); !s) {
LOG(WARNING) << "Slot migrating thread operation failed: " << s.Msg();
}
}
}

Status SlotMigrate::CreateMigrateHandleThread() {
try {
t_ = std::thread([this]() {
Util::ThreadSetName("slot-migrate");
thread_state_ = ThreadState::Running;
this->Loop();
});
} catch (const std::exception &e) {
return {Status::NotOK, std::string(e.what())};
}
t_ = GET_OR_RET(Util::CreateThread("slot-migrate", [this] {
thread_state_ = ThreadState::Running;
this->Loop();
}));

return Status::OK();
}
Expand Down
20 changes: 12 additions & 8 deletions src/commands/cmd_replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ class CommandFetchMeta : public Commander {
svr->stats_.IncrFullSyncCounter();

// Feed-replica-meta thread
std::thread t = std::thread([svr, repl_fd, ip, bev = conn->GetBufferEvent()]() {
Util::ThreadSetName("feed-repl-info");
auto t = GET_OR_RET(Util::CreateThread("feed-repl-info", [svr, repl_fd, ip, bev = conn->GetBufferEvent()] {
svr->IncrFetchFileThread();
auto exit = MakeScopeExit([svr, bev] {
bufferevent_free(bev);
Expand All @@ -243,8 +242,11 @@ class CommandFetchMeta : public Commander {
}
auto now = static_cast<time_t>(Util::GetTimeStamp());
svr->storage_->SetCheckpointAccessTime(now);
});
t.detach();
}));

if (auto s = Util::ThreadDetach(t); !s) {
return s;
}

return Status::OK();
}
Expand All @@ -271,8 +273,7 @@ class CommandFetchFile : public Commander {
conn->NeedNotFreeBufferEvent(); // Feed-replica-file thread will close the replica bufferevent
conn->EnableFlag(Redis::Connection::kCloseAsync);

std::thread t = std::thread([svr, repl_fd, ip, files, bev = conn->GetBufferEvent()]() {
Util::ThreadSetName("feed-repl-file");
auto t = GET_OR_RET(Util::CreateThread("feed-repl-file", [svr, repl_fd, ip, files, bev = conn->GetBufferEvent()]() {
auto exit = MakeScopeExit([bev] { bufferevent_free(bev); });
svr->IncrFetchFileThread();

Expand Down Expand Up @@ -311,8 +312,11 @@ class CommandFetchFile : public Commander {
auto now = static_cast<time_t>(Util::GetTimeStamp());
svr->storage_->SetCheckpointAccessTime(now);
svr->DecrFetchFileThread();
});
t.detach();
}));

if (auto s = Util::ThreadDetach(t); !s) {
return s;
}

return Status::OK();
}
Expand Down
6 changes: 4 additions & 2 deletions src/common/task_runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Status TaskRunner::Publish(const Task &task) {
void TaskRunner::Start() {
stop_ = false;
for (int i = 0; i < n_thread_; i++) {
threads_.emplace_back([this]() {
threads_.emplace_back([this] {
Util::ThreadSetName("task-runner");
this->run();
});
Expand All @@ -57,7 +57,9 @@ void TaskRunner::Stop() {

void TaskRunner::Join() {
for (auto &thread : threads_) {
if (thread.joinable()) thread.join();
if (auto s = Util::ThreadJoin(thread); !s) {
LOG(WARNING) << "Task thread operation failed: " << s.Msg();
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions src/common/thread_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "thread_util.h"

#include <fmt/std.h>
#include <pthread.h>

namespace Util {
Expand All @@ -32,4 +33,19 @@ void ThreadSetName(const char *name) {
#endif
}

template <void (std::thread::*F)(), typename... Args>
Status ThreadOperationImpl(std::thread &t, const char *op, Args &&...args) {
try {
(t.*F)(std::forward<Args>(args)...);
} catch (const std::system_error &e) {
return {Status::NotOK, fmt::format("thread #{} cannot be `{}`ed: {}", t.get_id(), op, e.what())};
}

return Status::OK();
}

Status ThreadJoin(std::thread &t) { return ThreadOperationImpl<&std::thread::join>(t, "join"); }

Status ThreadDetach(std::thread &t) { return ThreadOperationImpl<&std::thread::detach>(t, "detach"); }

} // namespace Util
21 changes: 21 additions & 0 deletions src/common/thread_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,29 @@

#pragma once

#include <system_error>
#include <thread>

#include "fmt/core.h"
#include "status.h"

namespace Util {

void ThreadSetName(const char *name);

template <typename F>
StatusOr<std::thread> CreateThread(const char *name, F f) {
try {
return std::thread([name, f = std::move(f)] {
ThreadSetName(name);
f();
});
} catch (const std::system_error &e) {
return {Status::NotOK, fmt::format("thread '{}' cannot be started: {}", name, e.what())};
}
}

Status ThreadJoin(std::thread &t);
Status ThreadDetach(std::thread &t);

} // namespace Util
18 changes: 9 additions & 9 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,11 @@ Status Server::Start() {

task_runner_.Start();
// setup server cron thread
cron_thread_ = std::thread([this]() {
Util::ThreadSetName("server-cron");
this->cron();
});
cron_thread_ = GET_OR_RET(Util::CreateThread("server-cron", [this] { this->cron(); }));

compaction_checker_thread_ = std::thread([this]() {
compaction_checker_thread_ = GET_OR_RET(Util::CreateThread("compact-check", [this] {
uint64_t counter = 0;
time_t last_compact_date = 0;
Util::ThreadSetName("compact-check");
CompactionChecker compaction_checker(this->storage_);

while (!stop_) {
Expand Down Expand Up @@ -204,7 +200,7 @@ Status Server::Start() {
}
}
}
});
}));

memory_startup_use_.store(Stats::GetMemoryRSS(), std::memory_order_relaxed);
LOG(INFO) << "[server] Ready to accept connections";
Expand Down Expand Up @@ -233,8 +229,12 @@ void Server::Join() {
}

task_runner_.Join();
if (cron_thread_.joinable()) cron_thread_.join();
if (compaction_checker_thread_.joinable()) compaction_checker_thread_.join();
if (auto s = Util::ThreadJoin(cron_thread_); !s) {
LOG(WARNING) << "Cron thread operation failed: " << s.Msg();
}
if (auto s = Util::ThreadJoin(compaction_checker_thread_); !s) {
LOG(WARNING) << "Compaction checker thread operation failed: " << s.Msg();
}
}

Status Server::AddMaster(const std::string &host, uint32_t port, bool force_reconnect) {
Expand Down
18 changes: 10 additions & 8 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -518,20 +518,22 @@ void Worker::KickoutIdleClients(int timeout) {
}

void WorkerThread::Start() {
try {
t_ = std::thread([this]() {
Util::ThreadSetName("worker");
this->worker_->Run(std::this_thread::get_id());
});
} catch (const std::system_error &e) {
LOG(ERROR) << "[worker] Failed to start worker thread, err: " << e.what();
auto s = Util::CreateThread("worker", [this] { this->worker_->Run(std::this_thread::get_id()); });

if (s) {
t_ = std::move(*s);
} else {
LOG(ERROR) << "[worker] Failed to start worker thread, err: " << s.Msg();
return;
}

LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started";
}

void WorkerThread::Stop() { worker_->Stop(); }

void WorkerThread::Join() {
if (t_.joinable()) t_.join();
if (auto s = Util::ThreadJoin(t_); !s) {
LOG(WARNING) << "[worker] " << s.Msg();
}
}