diff --git a/src/commands/blocking_commander.h b/src/commands/blocking_commander.h index 3353f7d225b..537e770fd1b 100644 --- a/src/commands/blocking_commander.h +++ b/src/commands/blocking_commander.h @@ -44,7 +44,6 @@ class BlockingCommander : public Commander, // in other words, returning true indicates ending the blocking virtual bool OnBlockingWrite() = 0; - bool IsBlocking() const override { return true; } // to start the blocking process // usually put to the end of the Execute method Status StartBlocking(int64_t timeout, std::string *output) { diff --git a/src/commands/cmd_pubsub.cc b/src/commands/cmd_pubsub.cc index 9ec38beefdb..45272eef2ca 100644 --- a/src/commands/cmd_pubsub.cc +++ b/src/commands/cmd_pubsub.cc @@ -82,7 +82,6 @@ void SubscribeCommandReply(std::string *output, const std::string &name, const s class CommandSubscribe : public Commander { public: - bool IsBlocking() const override { return true; } Status Execute(Server *srv, Connection *conn, std::string *output) override { for (unsigned i = 1; i < args_.size(); i++) { conn->SubscribeChannel(args_[i]); @@ -112,7 +111,6 @@ class CommandUnSubscribe : public Commander { class CommandPSubscribe : public Commander { public: - bool IsBlocking() const override { return true; } Status Execute(Server *srv, Connection *conn, std::string *output) override { for (size_t i = 1; i < args_.size(); i++) { conn->PSubscribeChannel(args_[i]); diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index 0f8d6b965b5..545d438de3f 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -750,7 +750,6 @@ class CommandXRead : public Commander, private EvbufCallbackBase, private EventCallbackBase { public: - bool IsBlocking() const override { return true; } Status Parse(const std::vector &args) override { size_t streams_word_idx = 0; diff --git a/src/commands/commander.h b/src/commands/commander.h index c37d1866e9a..1cb4d22108a 100644 --- a/src/commands/commander.h +++ b/src/commands/commander.h @@ -70,7 +70,6 @@ class Commander { void SetAttributes(const CommandAttributes *attributes) { attributes_ = attributes; } const CommandAttributes *GetAttributes() const { return attributes_; } void SetArgs(const std::vector &args) { args_ = args; } - virtual bool IsBlocking() const { return false; } virtual Status Parse() { return Parse(args_); } virtual Status Parse(const std::vector &args) { return Status::OK(); } virtual Status Execute(Server *srv, Connection *conn, std::string *output) { diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index 899260ff77a..a7dec01def0 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -34,6 +34,7 @@ #include "commands/blocking_commander.h" #include "redis_connection.h" +#include "scope_exit.h" #include "server.h" #include "time_util.h" #include "tls_util.h" @@ -75,8 +76,9 @@ void Connection::Close() { void Connection::Detach() { owner_->DetachConnection(this); } -void Connection::OnRead(bufferevent *bev) { - DLOG(INFO) << "[connection] on read: " << bufferevent_getfd(bev); +void Connection::OnRead(struct bufferevent *bev) { + is_running_ = true; + MakeScopeExit([this] { is_running_ = false; }); SetLastInteraction(); auto s = req_.Tokenize(Input()); @@ -177,6 +179,13 @@ void Connection::DisableFlag(Flag flag) { flags_ &= (~flag); } bool Connection::IsFlagEnabled(Flag flag) const { return (flags_ & flag) > 0; } +bool Connection::CanMigrate() const { + return !is_running_ // reading or writing + && !IsFlagEnabled(redis::Connection::kCloseAfterReply) // close after reply + && saved_current_command_ == nullptr // not executing blocking command like BLPOP + && subscribe_channels_.empty() && subscribe_patterns_.empty(); // not subscribing any channel +} + void Connection::SubscribeChannel(const std::string &channel) { for (const auto &chan : subscribe_channels_) { if (channel == chan) return; @@ -302,6 +311,7 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { bool is_multi_exec = IsFlagEnabled(Connection::kMultiExec); if (IsFlagEnabled(redis::Connection::kCloseAfterReply) && !is_multi_exec) break; + std::unique_ptr current_cmd; auto s = srv_->LookupAndCreateCommand(cmd_tokens.front(), ¤t_cmd); if (!s.IsOK()) { if (is_multi_exec) multi_error_ = true; @@ -424,6 +434,11 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { // Break the execution loop when occurring the blocking command like BLPOP or BRPOP, // it will suspend the connection and wait for the wakeup signal. if (s.Is()) { + // For the blocking command, it will use the command while resumed from the suspend state. + // So we need to save the command for the next execution. + // Migrate connection would also check the saved_current_command_ to determine whether + // the connection can be migrated or not. + saved_current_command_ = std::move(current_cmd); break; } diff --git a/src/server/redis_connection.h b/src/server/redis_connection.h index 0274bc33db5..e38a70fa52a 100644 --- a/src/server/redis_connection.h +++ b/src/server/redis_connection.h @@ -119,6 +119,7 @@ class Connection : public EvbufCallbackBase { void RecordProfilingSampleIfNeed(const std::string &cmd, uint64_t duration); void SetImporting() { importing_ = true; } bool IsImporting() const { return importing_; } + bool CanMigrate() const; // Multi exec void SetInExec() { in_exec_ = true; } @@ -127,7 +128,6 @@ class Connection : public EvbufCallbackBase { void ResetMultiExec(); std::deque *GetMultiExecCommands() { return &multi_cmds_; } - std::unique_ptr current_cmd; std::function close_cb = nullptr; std::set watched_keys; @@ -152,12 +152,15 @@ class Connection : public EvbufCallbackBase { bufferevent *bev_; Request req_; Worker *owner_; + std::unique_ptr saved_current_command_; + std::vector subscribe_channels_; std::vector subscribe_patterns_; Server *srv_; bool in_exec_ = false; bool multi_error_ = false; + std::atomic is_running_ = false; std::deque multi_cmds_; bool importing_ = false; diff --git a/src/server/server.cc b/src/server/server.cc index 03f2eae768d..41da1735a4c 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -115,7 +115,7 @@ Server::~Server() { for (auto &worker_thread : worker_threads_) { worker_thread.reset(); } - cleanupExitedWorkerThreads(); + cleanupExitedWorkerThreads(true /* force */); CleanupExitedSlaves(); lua::DestroyState(lua_); @@ -226,7 +226,7 @@ void Server::Stop() { slaveof_mu_.unlock(); for (const auto &worker : worker_threads_) { - worker->Stop(); + worker->Stop(0 /* immediately terminate */); } rocksdb::CancelAllBackgroundWork(storage->GetDB(), true); @@ -739,8 +739,9 @@ void Server::cron() { storage->SetDBInRetryableIOError(false); } - if (counter != 0 && counter % 10 == 0) { - cleanupExitedWorkerThreads(); + // check if we need to clean up exited worker threads every 5s + if (counter != 0 && counter % 50 == 0) { + cleanupExitedWorkerThreads(false); } CleanupExitedSlaves(); @@ -1685,12 +1686,12 @@ void Server::AdjustWorkerThreads() { if (new_worker_threads > worker_threads_.size()) { delta = new_worker_threads - worker_threads_.size(); increaseWorkerThreads(delta); - LOG(INFO) << "[server] Increase worker threads to " << new_worker_threads; + LOG(INFO) << "[server] Increase worker threads from " << worker_threads_.size() << " to " << new_worker_threads; return; } delta = worker_threads_.size() - new_worker_threads; - LOG(INFO) << "[server] Decrease worker threads to " << new_worker_threads; + LOG(INFO) << "[server] Decrease worker threads from " << worker_threads_.size() << " to " << new_worker_threads; decreaseWorkerThreads(delta); } @@ -1721,17 +1722,26 @@ void Server::decreaseWorkerThreads(size_t delta) { auto target_worker = worker_threads_[iter.first % remain_worker_threads]->GetWorker(); worker_thread->GetWorker()->MigrateConnection(target_worker, iter.second); } - worker_thread->Stop(); + worker_thread->Stop(10 /* graceful timeout */); // Don't join the worker thread here, because it may join itself. recycle_worker_threads_.push(std::move(worker_thread)); } } -void Server::cleanupExitedWorkerThreads() { +void Server::cleanupExitedWorkerThreads(bool force) { std::unique_ptr worker_thread = nullptr; - while (recycle_worker_threads_.try_pop(worker_thread)) { - worker_thread->Join(); - worker_thread.reset(); + auto total = recycle_worker_threads_.unsafe_size(); + for (size_t i = 0; i < total; i++) { + if (!recycle_worker_threads_.try_pop(worker_thread)) { + break; + } + if (worker_thread->IsTerminated() || force) { + worker_thread->Join(); + worker_thread.reset(); + } else { + // Push the worker thread back to the queue if it's still running. + recycle_worker_threads_.push(std::move(worker_thread)); + } } } diff --git a/src/server/server.h b/src/server/server.h index bdbaacad621..2acd0f5dbf1 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -305,7 +305,7 @@ class Server { void updateAllWatchedKeys(); void increaseWorkerThreads(size_t delta); void decreaseWorkerThreads(size_t delta); - void cleanupExitedWorkerThreads(); + void cleanupExitedWorkerThreads(bool force); std::atomic stop_ = false; std::atomic is_loading_ = false; diff --git a/src/server/worker.cc b/src/server/worker.cc index efc4ddc643e..47042d030b7 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -252,7 +252,8 @@ Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) { } evutil_make_socket_nonblocking(fd); - auto lev = NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd); + auto lev = + NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, fd); listen_events_.emplace_back(lev); } @@ -292,14 +293,22 @@ void Worker::Run(std::thread::id tid) { if (event_base_dispatch(base_) != 0) { LOG(ERROR) << "[worker] Failed to run server, err: " << strerror(errno); } + is_terminated_ = true; } -void Worker::Stop() { - event_base_loopbreak(base_); +void Worker::Stop(uint32_t wait_seconds) { for (const auto &lev : listen_events_) { // It's unnecessary to close the listener fd since we have set the LEV_OPT_CLOSE_ON_FREE flag evconnlistener_free(lev); } + // wait_seconds == 0 means stop immediately, or it will wait N seconds + // for the worker to process the remaining requests before stopping. + if (wait_seconds > 0) { + timeval tv = {wait_seconds, 0}; + event_base_loopexit(base_, &tv); + } else { + event_base_loopbreak(base_); + } } Status Worker::AddConnection(redis::Connection *c) { @@ -351,18 +360,24 @@ redis::Connection *Worker::removeConnection(int fd) { // blocked on a key or stream. void Worker::MigrateConnection(Worker *target, redis::Connection *conn) { if (!target || !conn) return; - if (conn->current_cmd != nullptr && conn->current_cmd->IsBlocking()) { - // don't need to close the connection since destroy worker thread will close it + + auto bev = conn->GetBufferEvent(); + // disable read/write event to prevent the connection from being processed during migration + bufferevent_disable(bev, EV_READ | EV_WRITE); + // We cannot migrate the connection if it has a running command + // since it will cause data race since the old worker may still process the command. + if (!conn->CanMigrate()) { + // Need to enable read/write event again since we disabled them before + bufferevent_enable(bev, EV_READ | EV_WRITE); return; } + // remove the connection from current worker + DetachConnection(conn); if (!target->AddConnection(conn).IsOK()) { - // destroy worker thread will close the connection + conn->Close(); return; } - // remove the connection from current worker - DetachConnection(conn); - auto bev = conn->GetBufferEvent(); bufferevent_base_set(target->base_, bev); conn->SetCB(bev); bufferevent_enable(bev, EV_READ | EV_WRITE); @@ -540,7 +555,7 @@ void WorkerThread::Start() { LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started"; } -void WorkerThread::Stop() { worker_->Stop(); } +void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); } void WorkerThread::Join() { if (auto s = util::ThreadJoin(t_); !s) { diff --git a/src/server/worker.h b/src/server/worker.h index 96a6c7cddcb..a9f618e57f5 100644 --- a/src/server/worker.h +++ b/src/server/worker.h @@ -50,8 +50,9 @@ class Worker : EventCallbackBase, EvconnlistenerBase { Worker(Worker &&) = delete; Worker &operator=(const Worker &) = delete; - void Stop(); + void Stop(uint32_t wait_seconds); void Run(std::thread::id tid); + bool IsTerminated() const { return is_terminated_; } void MigrateConnection(Worker *target, redis::Connection *conn); void DetachConnection(redis::Connection *conn); @@ -94,6 +95,7 @@ class Worker : EventCallbackBase, EvconnlistenerBase { struct bufferevent_rate_limit_group *rate_limit_group_ = nullptr; struct ev_token_bucket_cfg *rate_limit_group_cfg_ = nullptr; lua_State *lua_; + std::atomic is_terminated_ = false; }; class WorkerThread { @@ -106,8 +108,9 @@ class WorkerThread { Worker *GetWorker() { return worker_.get(); } void Start(); - void Stop(); + void Stop(uint32_t wait_seconds); void Join(); + bool IsTerminated() const { return worker_->IsTerminated(); } private: std::thread t_; diff --git a/tests/gocase/unit/config/config_test.go b/tests/gocase/unit/config/config_test.go index 20dcc612047..dd880367847 100644 --- a/tests/gocase/unit/config/config_test.go +++ b/tests/gocase/unit/config/config_test.go @@ -151,10 +151,7 @@ func TestDynamicChangeWorkerThread(t *testing.T) { defer srv.Close() ctx := context.Background() - rdb := srv.NewClientWithOption(&redis.Options{ - MaxIdleConns: 20, - MaxRetries: -1, // Disable retry to check connections are alive after config change - }) + rdb := srv.NewClient() defer func() { require.NoError(t, rdb.Close()) }() t.Run("Test dynamic change worker thread", func(t *testing.T) { @@ -217,12 +214,14 @@ func TestDynamicChangeWorkerThread(t *testing.T) { go func() { defer wg.Done() _ = rdb.XRead(ctx, &redis.XReadArgs{ - Streams: []string{"s1", "s2", "s3"}, + Streams: []string{"s1", "$"}, Count: 1, Block: blockingTimeout, }) }() + // sleep a while to make sure all blocking requests are ready + time.Sleep(time.Second) require.NoError(t, rdb.Do(ctx, "CONFIG", "SET", "workers", "1").Err()) wg.Wait()