Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…rocks into issues/1080
  • Loading branch information
jihuayu committed Nov 17, 2023
2 parents ce8e241 + 6096801 commit 24099a5
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 37 deletions.
1 change: 0 additions & 1 deletion src/commands/blocking_commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class BlockingCommander : public Commander,
// in other words, returning true indicates ending the blocking
virtual bool OnBlockingWrite() = 0;

bool IsBlocking() const override { return true; }
// to start the blocking process
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
Expand Down
2 changes: 0 additions & 2 deletions src/commands/cmd_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ void SubscribeCommandReply(std::string *output, const std::string &name, const s

class CommandSubscribe : public Commander {
public:
bool IsBlocking() const override { return true; }
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (unsigned i = 1; i < args_.size(); i++) {
conn->SubscribeChannel(args_[i]);
Expand Down Expand Up @@ -112,7 +111,6 @@ class CommandUnSubscribe : public Commander {

class CommandPSubscribe : public Commander {
public:
bool IsBlocking() const override { return true; }
Status Execute(Server *srv, Connection *conn, std::string *output) override {
for (size_t i = 1; i < args_.size(); i++) {
conn->PSubscribeChannel(args_[i]);
Expand Down
1 change: 0 additions & 1 deletion src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,6 @@ class CommandXRead : public Commander,
private EvbufCallbackBase<CommandXRead, false>,
private EventCallbackBase<CommandXRead> {
public:
bool IsBlocking() const override { return true; }
Status Parse(const std::vector<std::string> &args) override {
size_t streams_word_idx = 0;

Expand Down
1 change: 0 additions & 1 deletion src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class Commander {
void SetAttributes(const CommandAttributes *attributes) { attributes_ = attributes; }
const CommandAttributes *GetAttributes() const { return attributes_; }
void SetArgs(const std::vector<std::string> &args) { args_ = args; }
virtual bool IsBlocking() const { return false; }
virtual Status Parse() { return Parse(args_); }
virtual Status Parse(const std::vector<std::string> &args) { return Status::OK(); }
virtual Status Execute(Server *srv, Connection *conn, std::string *output) {
Expand Down
19 changes: 17 additions & 2 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#include "commands/blocking_commander.h"
#include "redis_connection.h"
#include "scope_exit.h"
#include "server.h"
#include "time_util.h"
#include "tls_util.h"
Expand Down Expand Up @@ -75,8 +76,9 @@ void Connection::Close() {

void Connection::Detach() { owner_->DetachConnection(this); }

void Connection::OnRead(bufferevent *bev) {
DLOG(INFO) << "[connection] on read: " << bufferevent_getfd(bev);
void Connection::OnRead(struct bufferevent *bev) {
is_running_ = true;
MakeScopeExit([this] { is_running_ = false; });

SetLastInteraction();
auto s = req_.Tokenize(Input());
Expand Down Expand Up @@ -177,6 +179,13 @@ void Connection::DisableFlag(Flag flag) { flags_ &= (~flag); }

bool Connection::IsFlagEnabled(Flag flag) const { return (flags_ & flag) > 0; }

bool Connection::CanMigrate() const {
return !is_running_ // reading or writing
&& !IsFlagEnabled(redis::Connection::kCloseAfterReply) // close after reply
&& saved_current_command_ == nullptr // not executing blocking command like BLPOP
&& subscribe_channels_.empty() && subscribe_patterns_.empty(); // not subscribing any channel
}

void Connection::SubscribeChannel(const std::string &channel) {
for (const auto &chan : subscribe_channels_) {
if (channel == chan) return;
Expand Down Expand Up @@ -302,6 +311,7 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
bool is_multi_exec = IsFlagEnabled(Connection::kMultiExec);
if (IsFlagEnabled(redis::Connection::kCloseAfterReply) && !is_multi_exec) break;

std::unique_ptr<Commander> current_cmd;
auto s = srv_->LookupAndCreateCommand(cmd_tokens.front(), &current_cmd);
if (!s.IsOK()) {
if (is_multi_exec) multi_error_ = true;
Expand Down Expand Up @@ -424,6 +434,11 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
// Break the execution loop when occurring the blocking command like BLPOP or BRPOP,
// it will suspend the connection and wait for the wakeup signal.
if (s.Is<Status::BlockingCmd>()) {
// For the blocking command, it will use the command while resumed from the suspend state.
// So we need to save the command for the next execution.
// Migrate connection would also check the saved_current_command_ to determine whether
// the connection can be migrated or not.
saved_current_command_ = std::move(current_cmd);
break;
}

Expand Down
5 changes: 4 additions & 1 deletion src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class Connection : public EvbufCallbackBase<Connection> {
void RecordProfilingSampleIfNeed(const std::string &cmd, uint64_t duration);
void SetImporting() { importing_ = true; }
bool IsImporting() const { return importing_; }
bool CanMigrate() const;

// Multi exec
void SetInExec() { in_exec_ = true; }
Expand All @@ -127,7 +128,6 @@ class Connection : public EvbufCallbackBase<Connection> {
void ResetMultiExec();
std::deque<redis::CommandTokens> *GetMultiExecCommands() { return &multi_cmds_; }

std::unique_ptr<Commander> current_cmd;
std::function<void(int)> close_cb = nullptr;

std::set<std::string> watched_keys;
Expand All @@ -152,12 +152,15 @@ class Connection : public EvbufCallbackBase<Connection> {
bufferevent *bev_;
Request req_;
Worker *owner_;
std::unique_ptr<Commander> saved_current_command_;

std::vector<std::string> subscribe_channels_;
std::vector<std::string> subscribe_patterns_;

Server *srv_;
bool in_exec_ = false;
bool multi_error_ = false;
std::atomic<bool> is_running_ = false;
std::deque<redis::CommandTokens> multi_cmds_;

bool importing_ = false;
Expand Down
32 changes: 21 additions & 11 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Server::~Server() {
for (auto &worker_thread : worker_threads_) {
worker_thread.reset();
}
cleanupExitedWorkerThreads();
cleanupExitedWorkerThreads(true /* force */);
CleanupExitedSlaves();

lua::DestroyState(lua_);
Expand Down Expand Up @@ -226,7 +226,7 @@ void Server::Stop() {
slaveof_mu_.unlock();

for (const auto &worker : worker_threads_) {
worker->Stop();
worker->Stop(0 /* immediately terminate */);
}

rocksdb::CancelAllBackgroundWork(storage->GetDB(), true);
Expand Down Expand Up @@ -739,8 +739,9 @@ void Server::cron() {
storage->SetDBInRetryableIOError(false);
}

if (counter != 0 && counter % 10 == 0) {
cleanupExitedWorkerThreads();
// check if we need to clean up exited worker threads every 5s
if (counter != 0 && counter % 50 == 0) {
cleanupExitedWorkerThreads(false);
}

CleanupExitedSlaves();
Expand Down Expand Up @@ -1685,12 +1686,12 @@ void Server::AdjustWorkerThreads() {
if (new_worker_threads > worker_threads_.size()) {
delta = new_worker_threads - worker_threads_.size();
increaseWorkerThreads(delta);
LOG(INFO) << "[server] Increase worker threads to " << new_worker_threads;
LOG(INFO) << "[server] Increase worker threads from " << worker_threads_.size() << " to " << new_worker_threads;
return;
}

delta = worker_threads_.size() - new_worker_threads;
LOG(INFO) << "[server] Decrease worker threads to " << new_worker_threads;
LOG(INFO) << "[server] Decrease worker threads from " << worker_threads_.size() << " to " << new_worker_threads;
decreaseWorkerThreads(delta);
}

Expand Down Expand Up @@ -1721,17 +1722,26 @@ void Server::decreaseWorkerThreads(size_t delta) {
auto target_worker = worker_threads_[iter.first % remain_worker_threads]->GetWorker();
worker_thread->GetWorker()->MigrateConnection(target_worker, iter.second);
}
worker_thread->Stop();
worker_thread->Stop(10 /* graceful timeout */);
// Don't join the worker thread here, because it may join itself.
recycle_worker_threads_.push(std::move(worker_thread));
}
}

void Server::cleanupExitedWorkerThreads() {
void Server::cleanupExitedWorkerThreads(bool force) {
std::unique_ptr<WorkerThread> worker_thread = nullptr;
while (recycle_worker_threads_.try_pop(worker_thread)) {
worker_thread->Join();
worker_thread.reset();
auto total = recycle_worker_threads_.unsafe_size();
for (size_t i = 0; i < total; i++) {
if (!recycle_worker_threads_.try_pop(worker_thread)) {
break;
}
if (worker_thread->IsTerminated() || force) {
worker_thread->Join();
worker_thread.reset();
} else {
// Push the worker thread back to the queue if it's still running.
recycle_worker_threads_.push(std::move(worker_thread));
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ class Server {
void updateAllWatchedKeys();
void increaseWorkerThreads(size_t delta);
void decreaseWorkerThreads(size_t delta);
void cleanupExitedWorkerThreads();
void cleanupExitedWorkerThreads(bool force);

std::atomic<bool> stop_ = false;
std::atomic<bool> is_loading_ = false;
Expand Down
35 changes: 25 additions & 10 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) {
}

evutil_make_socket_nonblocking(fd);
auto lev = NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_CLOSE_ON_FREE, backlog, fd);
auto lev =
NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, fd);
listen_events_.emplace_back(lev);
}

Expand Down Expand Up @@ -292,14 +293,22 @@ void Worker::Run(std::thread::id tid) {
if (event_base_dispatch(base_) != 0) {
LOG(ERROR) << "[worker] Failed to run server, err: " << strerror(errno);
}
is_terminated_ = true;
}

void Worker::Stop() {
event_base_loopbreak(base_);
void Worker::Stop(uint32_t wait_seconds) {
for (const auto &lev : listen_events_) {
// It's unnecessary to close the listener fd since we have set the LEV_OPT_CLOSE_ON_FREE flag
evconnlistener_free(lev);
}
// wait_seconds == 0 means stop immediately, or it will wait N seconds
// for the worker to process the remaining requests before stopping.
if (wait_seconds > 0) {
timeval tv = {wait_seconds, 0};
event_base_loopexit(base_, &tv);
} else {
event_base_loopbreak(base_);
}
}

Status Worker::AddConnection(redis::Connection *c) {
Expand Down Expand Up @@ -351,18 +360,24 @@ redis::Connection *Worker::removeConnection(int fd) {
// blocked on a key or stream.
void Worker::MigrateConnection(Worker *target, redis::Connection *conn) {
if (!target || !conn) return;
if (conn->current_cmd != nullptr && conn->current_cmd->IsBlocking()) {
// don't need to close the connection since destroy worker thread will close it

auto bev = conn->GetBufferEvent();
// disable read/write event to prevent the connection from being processed during migration
bufferevent_disable(bev, EV_READ | EV_WRITE);
// We cannot migrate the connection if it has a running command
// since it will cause data race since the old worker may still process the command.
if (!conn->CanMigrate()) {
// Need to enable read/write event again since we disabled them before
bufferevent_enable(bev, EV_READ | EV_WRITE);
return;
}

// remove the connection from current worker
DetachConnection(conn);
if (!target->AddConnection(conn).IsOK()) {
// destroy worker thread will close the connection
conn->Close();
return;
}
// remove the connection from current worker
DetachConnection(conn);
auto bev = conn->GetBufferEvent();
bufferevent_base_set(target->base_, bev);
conn->SetCB(bev);
bufferevent_enable(bev, EV_READ | EV_WRITE);
Expand Down Expand Up @@ -540,7 +555,7 @@ void WorkerThread::Start() {
LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started";
}

void WorkerThread::Stop() { worker_->Stop(); }
void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); }

void WorkerThread::Join() {
if (auto s = util::ThreadJoin(t_); !s) {
Expand Down
7 changes: 5 additions & 2 deletions src/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
Worker(Worker &&) = delete;
Worker &operator=(const Worker &) = delete;

void Stop();
void Stop(uint32_t wait_seconds);
void Run(std::thread::id tid);
bool IsTerminated() const { return is_terminated_; }

void MigrateConnection(Worker *target, redis::Connection *conn);
void DetachConnection(redis::Connection *conn);
Expand Down Expand Up @@ -94,6 +95,7 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
struct bufferevent_rate_limit_group *rate_limit_group_ = nullptr;
struct ev_token_bucket_cfg *rate_limit_group_cfg_ = nullptr;
lua_State *lua_;
std::atomic<bool> is_terminated_ = false;
};

class WorkerThread {
Expand All @@ -106,8 +108,9 @@ class WorkerThread {

Worker *GetWorker() { return worker_.get(); }
void Start();
void Stop();
void Stop(uint32_t wait_seconds);
void Join();
bool IsTerminated() const { return worker_->IsTerminated(); }

private:
std::thread t_;
Expand Down
9 changes: 4 additions & 5 deletions tests/gocase/unit/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,7 @@ func TestDynamicChangeWorkerThread(t *testing.T) {
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClientWithOption(&redis.Options{
MaxIdleConns: 20,
MaxRetries: -1, // Disable retry to check connections are alive after config change
})
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()

t.Run("Test dynamic change worker thread", func(t *testing.T) {
Expand Down Expand Up @@ -217,12 +214,14 @@ func TestDynamicChangeWorkerThread(t *testing.T) {
go func() {
defer wg.Done()
_ = rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{"s1", "s2", "s3"},
Streams: []string{"s1", "$"},
Count: 1,
Block: blockingTimeout,
})
}()

// sleep a while to make sure all blocking requests are ready
time.Sleep(time.Second)
require.NoError(t, rdb.Do(ctx, "CONFIG", "SET", "workers", "1").Err())
wg.Wait()

Expand Down

0 comments on commit 24099a5

Please sign in to comment.