Skip to content

Commit

Permalink
Graceful shutdown the workers when reducing worker threads
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk committed Nov 2, 2023
1 parent 1142a9d commit 9b7e839
Show file tree
Hide file tree
Showing 11 changed files with 50 additions and 32 deletions.
1 change: 0 additions & 1 deletion src/commands/blocking_commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class BlockingCommander : public Commander,
// in other words, returning true indicates ending the blocking
virtual bool OnBlockingWrite() = 0;

bool IsBlocking() const override { return true; }
// to start the blocking process
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
Expand Down
2 changes: 0 additions & 2 deletions src/commands/cmd_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ void SubscribeCommandReply(std::string *output, const std::string &name, const s

class CommandSubscribe : public Commander {
public:
bool IsBlocking() const override { return true; }
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (unsigned i = 1; i < args_.size(); i++) {
conn->SubscribeChannel(args_[i]);
Expand Down Expand Up @@ -112,7 +111,6 @@ class CommandUnSubscribe : public Commander {

class CommandPSubscribe : public Commander {
public:
bool IsBlocking() const override { return true; }
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (size_t i = 1; i < args_.size(); i++) {
conn->PSubscribeChannel(args_[i]);
Expand Down
1 change: 0 additions & 1 deletion src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,6 @@ class CommandXRead : public Commander,
private EvbufCallbackBase<CommandXRead, false>,
private EventCallbackBase<CommandXRead> {
public:
bool IsBlocking() const override { return true; }
Status Parse(const std::vector<std::string> &args) override {
size_t streams_word_idx = 0;

Expand Down
1 change: 0 additions & 1 deletion src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class Commander {
void SetAttributes(const CommandAttributes *attributes) { attributes_ = attributes; }
const CommandAttributes *GetAttributes() const { return attributes_; }
void SetArgs(const std::vector<std::string> &args) { args_ = args; }
virtual bool IsBlocking() const { return false; }
virtual Status Parse() { return Parse(args_); }
virtual Status Parse(const std::vector<std::string> &args) { return Status::OK(); }
virtual Status Execute(Server *srv, Connection *conn, std::string *output) {
Expand Down
1 change: 1 addition & 0 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
if (!reply.empty()) Reply(reply);
reply.clear();
}
if (current_cmd != nullptr) current_cmd.reset();
}

void Connection::ResetMultiExec() {
Expand Down
3 changes: 2 additions & 1 deletion src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class Connection : public EvbufCallbackBase<Connection> {
Worker *Owner() { return owner_; }
void SetOwner(Worker *new_owner) { owner_ = new_owner; };
int GetFD() { return bufferevent_getfd(bev_); }
bool HasRunningCommand() const { return current_cmd != nullptr; }
evbuffer *Input() { return bufferevent_get_input(bev_); }
evbuffer *Output() { return bufferevent_get_output(bev_); }
bufferevent *GetBufferEvent() { return bev_; }
Expand All @@ -127,8 +128,8 @@ class Connection : public EvbufCallbackBase<Connection> {
void ResetMultiExec();
std::deque<redis::CommandTokens> *GetMultiExecCommands() { return &multi_cmds_; }

std::unique_ptr<Commander> current_cmd;
std::function<void(int)> close_cb = nullptr;
std::unique_ptr<Commander> current_cmd;

std::set<std::string> watched_keys;
std::atomic<bool> watched_keys_modified = false;
Expand Down
32 changes: 21 additions & 11 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Server::~Server() {
for (auto &worker_thread : worker_threads_) {
worker_thread.reset();
}
cleanupExitedWorkerThreads();
cleanupExitedWorkerThreads(true /* force */);
CleanupExitedSlaves();

lua::DestroyState(lua_);
Expand Down Expand Up @@ -226,7 +226,7 @@ void Server::Stop() {
slaveof_mu_.unlock();

for (const auto &worker : worker_threads_) {
worker->Stop();
worker->Stop(0 /* immediately terminate */);
}

rocksdb::CancelAllBackgroundWork(storage->GetDB(), true);
Expand Down Expand Up @@ -739,8 +739,9 @@ void Server::cron() {
storage->SetDBInRetryableIOError(false);
}

if (counter != 0 && counter % 10 == 0) {
cleanupExitedWorkerThreads();
// check if we need to clean up exited worker threads every 5s
if (counter != 0 && counter % 50 == 0) {
cleanupExitedWorkerThreads(false);
}

CleanupExitedSlaves();
Expand Down Expand Up @@ -1685,12 +1686,12 @@ void Server::AdjustWorkerThreads() {
if (new_worker_threads > worker_threads_.size()) {
delta = new_worker_threads - worker_threads_.size();
increaseWorkerThreads(delta);
LOG(INFO) << "[server] Increase worker threads to " << new_worker_threads;
LOG(INFO) << "[server] Increase worker threads from " << worker_threads_.size() << " to " << new_worker_threads;
return;
}

delta = worker_threads_.size() - new_worker_threads;
LOG(INFO) << "[server] Decrease worker threads to " << new_worker_threads;
LOG(INFO) << "[server] Decrease worker threads from " << worker_threads_.size() << " to " << new_worker_threads;
decreaseWorkerThreads(delta);
}

Expand Down Expand Up @@ -1721,17 +1722,26 @@ void Server::decreaseWorkerThreads(size_t delta) {
auto target_worker = worker_threads_[iter.first % remain_worker_threads]->GetWorker();
worker_thread->GetWorker()->MigrateConnection(target_worker, iter.second);
}
worker_thread->Stop();
worker_thread->Stop(10 /* graceful timeout */);
// Don't join the worker thread here, because it may join itself.
recycle_worker_threads_.push(std::move(worker_thread));
}
}

void Server::cleanupExitedWorkerThreads() {
void Server::cleanupExitedWorkerThreads(bool force) {
std::unique_ptr<WorkerThread> worker_thread = nullptr;
while (recycle_worker_threads_.try_pop(worker_thread)) {
worker_thread->Join();
worker_thread.reset();
auto total = recycle_worker_threads_.unsafe_size();
for (size_t i = 0; i < total; i++) {
if (!recycle_worker_threads_.try_pop(worker_thread)) {
break;
}
if (worker_thread->IsTerminated() || force) {
worker_thread->Join();
worker_thread.reset();
} else {
// Push the worker thread back to the queue if it's still running.
recycle_worker_threads_.push(std::move(worker_thread));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class Server {
void updateAllWatchedKeys();
void increaseWorkerThreads(size_t delta);
void decreaseWorkerThreads(size_t delta);
void cleanupExitedWorkerThreads();
void cleanupExitedWorkerThreads(bool force);

std::atomic<bool> stop_ = false;
std::atomic<bool> is_loading_ = false;
Expand Down
27 changes: 19 additions & 8 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) {
}

evutil_make_socket_nonblocking(fd);
auto lev = NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
auto lev =
NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
}

Expand Down Expand Up @@ -292,14 +293,22 @@ void Worker::Run(std::thread::id tid) {
if (event_base_dispatch(base_) != 0) {
LOG(ERROR) << "[worker] Failed to run server, err: " << strerror(errno);
}
is_terminated_ = true;
}

void Worker::Stop() {
event_base_loopbreak(base_);
void Worker::Stop(uint32_t wait_seconds) {
for (const auto &lev : listen_events_) {
// It's unnecessary to close the listener fd since we have set the LEV_OPT_CLOSE_ON_FREE flag
evconnlistener_free(lev);
}
// wait_seconds == 0 means stop immediately, or it will wait N seconds
// for the worker to process the remaining requests before stopping.
if (wait_seconds > 0) {
timeval tv = {wait_seconds, 0};
event_base_loopexit(base_, &tv);
} else {
event_base_loopbreak(base_);
}
}

Status Worker::AddConnection(redis::Connection *c) {
Expand Down Expand Up @@ -351,17 +360,19 @@ redis::Connection *Worker::removeConnection(int fd) {
// blocked on a key or stream.
void Worker::MigrateConnection(Worker *target, redis::Connection *conn) {
if (!target || !conn) return;
if (conn->current_cmd != nullptr && conn->current_cmd->IsBlocking()) {
// We cannot migrate the connection if it has a running command
// since it will cause data race since the old worker may still process the command.
if (conn->HasRunningCommand()) {
// don't need to close the connection since destroy worker thread will close it
return;
}

// remove the connection from current worker
DetachConnection(conn);
if (!target->AddConnection(conn).IsOK()) {
// destroy worker thread will close the connection
conn->Close();
return;
}
// remove the connection from current worker
DetachConnection(conn);
auto bev = conn->GetBufferEvent();
bufferevent_base_set(target->base_, bev);
conn->SetCB(bev);
Expand Down Expand Up @@ -540,7 +551,7 @@ void WorkerThread::Start() {
LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started";
}

void WorkerThread::Stop() { worker_->Stop(); }
void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); }

void WorkerThread::Join() {
if (auto s = util::ThreadJoin(t_); !s) {
Expand Down
7 changes: 5 additions & 2 deletions src/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
Worker(Worker &&) = delete;
Worker &operator=(const Worker &) = delete;

void Stop();
void Stop(uint32_t wait_seconds);
void Run(std::thread::id tid);
bool IsTerminated() const { return is_terminated_; }

void MigrateConnection(Worker *target, redis::Connection *conn);
void DetachConnection(redis::Connection *conn);
Expand Down Expand Up @@ -94,6 +95,7 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
struct bufferevent_rate_limit_group *rate_limit_group_ = nullptr;
struct ev_token_bucket_cfg *rate_limit_group_cfg_ = nullptr;
lua_State *lua_;
std::atomic<bool> is_terminated_ = false;
};

class WorkerThread {
Expand All @@ -106,8 +108,9 @@ class WorkerThread {

Worker *GetWorker() { return worker_.get(); }
void Start();
void Stop();
void Stop(uint32_t wait_seconds);
void Join();
bool IsTerminated() const { return worker_->IsTerminated(); }

private:
std::thread t_;
Expand Down
5 changes: 1 addition & 4 deletions tests/gocase/unit/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,7 @@ func TestDynamicChangeWorkerThread(t *testing.T) {
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClientWithOption(&redis.Options{
MaxIdleConns: 20,
MaxRetries: -1, // Disable retry to check connections are alive after config change
})
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

t.Run("Test dynamic change worker thread", func(t *testing.T) {
Expand Down

0 comments on commit 9b7e839

Please sign in to comment.