From 3d495ef23939b7a1183410c7dd3aa028a154b844 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Sun, 28 Jul 2024 16:10:52 +0800 Subject: [PATCH 01/16] remove related mutex with tbb hashmap --- src/server/worker.cc | 205 +++++++++++++++++++++++-------------------- src/server/worker.h | 10 ++- 2 files changed, 116 insertions(+), 99 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 22054e1faf8..7ce09e699e6 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -76,18 +76,24 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new()) } Worker::~Worker() { - std::vector conns; - conns.reserve(conns_.size() + monitor_conns_.size()); + // std::vector conns; + // conns.reserve(conns_.size() + monitor_conns_.size()); for (const auto &iter : conns_) { - conns.emplace_back(iter.second); + if (ConnMap::accessor accessor; conns_.find(accessor, iter.first)) { + // conns.emplace_back(accessor->second); + accessor->second->Close(); + } } for (const auto &iter : monitor_conns_) { - conns.emplace_back(iter.second); - } - for (const auto &iter : conns) { - iter->Close(); + if (ConnMap::accessor accessor; monitor_conns_.find(accessor, iter.first)) { + // conns.emplace_back(accessor->second); + accessor->second->Close(); + } } + // for (const auto &iter : conns) { + // iter->Close(); + // } timer_.reset(); if (rate_limit_group_) { @@ -311,9 +317,7 @@ void Worker::Stop(uint32_t wait_seconds) { } Status Worker::AddConnection(redis::Connection *c) { - std::unique_lock lock(conns_mu_); - auto iter = conns_.find(c->GetFD()); - if (iter != conns_.end()) { + if (ConnMap::const_accessor accessor; conns_.find(accessor, c->GetFD())) { return {Status::NotOK, "connection was exists"}; } @@ -323,7 +327,8 @@ Status Worker::AddConnection(redis::Connection *c) { return {Status::NotOK, "max number of clients reached"}; } - conns_.emplace(c->GetFD(), c); + ConnMap::accessor accessor; + conns_.insert(accessor, std::make_pair(c->GetFD(), c)); uint64_t id = srv->GetClientID(); c->SetID(id); @@ -333,18 +338,15 @@ Status Worker::AddConnection(redis::Connection *c) { redis::Connection *Worker::removeConnection(int fd) { redis::Connection *conn = nullptr; - std::unique_lock lock(conns_mu_); - auto iter = conns_.find(fd); - if (iter != conns_.end()) { - conn = iter->second; - conns_.erase(iter); + if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { + conn = accessor->second; + conns_.erase(accessor); srv->DecrClientNum(); } - iter = monitor_conns_.find(fd); - if (iter != monitor_conns_.end()) { - conn = iter->second; - monitor_conns_.erase(iter); + if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) { + conn = accessor->second; + monitor_conns_.erase(accessor); srv->DecrClientNum(); srv->DecrMonitorClientNum(); } @@ -409,31 +411,25 @@ void Worker::FreeConnection(redis::Connection *conn) { } void Worker::FreeConnectionByID(int fd, uint64_t id) { - std::unique_lock lock(conns_mu_); - auto iter = conns_.find(fd); - if (iter != conns_.end() && iter->second->GetID() == id) { + if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { if (rate_limit_group_ != nullptr) { - bufferevent_remove_from_rate_limit_group(iter->second->GetBufferEvent()); + bufferevent_remove_from_rate_limit_group(accessor->second->GetBufferEvent()); } - delete iter->second; - conns_.erase(iter); + delete accessor->second; + conns_.erase(accessor); srv->DecrClientNum(); } - - iter = monitor_conns_.find(fd); - if (iter != monitor_conns_.end() && iter->second->GetID() == id) { - delete iter->second; - monitor_conns_.erase(iter); + if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) { + delete accessor->second; + monitor_conns_.erase(accessor); srv->DecrClientNum(); srv->DecrMonitorClientNum(); } } Status Worker::EnableWriteEvent(int fd) { - std::unique_lock lock(conns_mu_); - auto iter = conns_.find(fd); - if (iter != conns_.end()) { - auto bev = iter->second->GetBufferEvent(); + if (ConnMap::const_accessor accessor; conns_.find(accessor, fd)) { + auto bev = accessor->second->GetBufferEvent(); bufferevent_enable(bev, EV_WRITE); return Status::OK(); } @@ -442,11 +438,9 @@ Status Worker::EnableWriteEvent(int fd) { } Status Worker::Reply(int fd, const std::string &reply) { - std::unique_lock lock(conns_mu_); - auto iter = conns_.find(fd); - if (iter != conns_.end()) { - iter->second->SetLastInteraction(); - redis::Reply(iter->second->Output(), reply); + if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { + accessor->second->SetLastInteraction(); + redis::Reply(accessor->second->Output(), reply); return Status::OK(); } @@ -454,44 +448,51 @@ Status Worker::Reply(int fd, const std::string &reply) { } void Worker::BecomeMonitorConn(redis::Connection *conn) { - { - std::lock_guard guard(conns_mu_); - conns_.erase(conn->GetFD()); - monitor_conns_[conn->GetFD()] = conn; + if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) { + conns_.erase(accessor); + accessor.release(); + if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) { + accessor->second = conn; + } else { + monitor_conns_.insert(accessor, std::make_pair(conn->GetFD(), conn)); + } } srv->IncrMonitorClientNum(); conn->EnableFlag(redis::Connection::kMonitor); } void Worker::QuitMonitorConn(redis::Connection *conn) { - { - std::lock_guard guard(conns_mu_); - monitor_conns_.erase(conn->GetFD()); - conns_[conn->GetFD()] = conn; + if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) { + monitor_conns_.erase(accessor); + accessor.release(); + if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) { + accessor->second = conn; + } else { + conns_.insert(accessor, std::make_pair(conn->GetFD(), conn)); + } } srv->DecrMonitorClientNum(); conn->DisableFlag(redis::Connection::kMonitor); } void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) { - std::unique_lock lock(conns_mu_); - - for (const auto &iter : monitor_conns_) { - if (conn == iter.second) continue; // skip the monitor command - - if (conn->GetNamespace() == iter.second->GetNamespace() || iter.second->GetNamespace() == kDefaultNamespace) { - iter.second->Reply(response); + for (const auto &[key, _] : monitor_conns_) { + if (ConnMap::accessor accessor; monitor_conns_.find(accessor, key)) { + const auto &value = accessor->second; + if (conn == value) continue; + if (conn->GetNamespace() == value->GetNamespace() || value->GetNamespace() == kDefaultNamespace) { + value->Reply(response); + } } } } std::string Worker::GetClientsStr() { - std::unique_lock lock(conns_mu_); - std::string output; - for (const auto &iter : conns_) { - redis::Connection *conn = iter.second; - output.append(conn->ToString()); + for (const auto &[key, _] : conns_) { + if (ConnMap::const_accessor accessor; conns_.find(accessor, key)) { + output.append(accessor->second->ToString()); + } } return output; @@ -499,27 +500,26 @@ std::string Worker::GetClientsStr() { void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string &addr, uint64_t type, bool skipme, int64_t *killed) { - std::lock_guard guard(conns_mu_); - - for (const auto &iter : conns_) { - redis::Connection *conn = iter.second; - if (skipme && self == conn) continue; - - // no need to kill the client again if the kCloseAfterReply flag is set - if (conn->IsFlagEnabled(redis::Connection::kCloseAfterReply)) { - continue; - } - - if ((type & conn->GetClientType()) || - (!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) || - (id != 0 && conn->GetID() == id)) { - conn->EnableFlag(redis::Connection::kCloseAfterReply); - // enable write event to notify worker wake up ASAP, and remove the connection - if (!conn->IsFlagEnabled(redis::Connection::kSlave)) { // don't enable any event in slave connection - auto bev = conn->GetBufferEvent(); - bufferevent_enable(bev, EV_WRITE); + for (const auto &[key, _] : conns_) { + if (ConnMap::accessor accessor; conns_.find(accessor, key)) { + auto conn = accessor->second; + if (skipme && self == conn) continue; + + // no need to kill the client again if the kCloseAfterReply flag is set + if (conn->IsFlagEnabled(redis::Connection::kCloseAfterReply)) { + continue; + } + if ((type & conn->GetClientType()) || + (!addr.empty() && (conn->GetAddr() == addr || conn->GetAnnounceAddr() == addr)) || + (id != 0 && conn->GetID() == id)) { + conn->EnableFlag(redis::Connection::kCloseAfterReply); + // enable write event to notify worker wake up ASAP, and remove the connection + if (!conn->IsFlagEnabled(redis::Connection::kSlave)) { // don't enable any event in slave connection + auto bev = conn->GetBufferEvent(); + bufferevent_enable(bev, EV_WRITE); + } + (*killed)++; } - (*killed)++; } } } @@ -527,24 +527,29 @@ void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string void Worker::KickoutIdleClients(int timeout) { std::vector> to_be_killed_conns; - { - std::lock_guard guard(conns_mu_); - if (conns_.empty()) { - return; - } + std::set fds; + for (const auto &[key, _] : conns_) { + fds.emplace(key); + } - int iterations = std::min(static_cast(conns_.size()), 50); - auto iter = conns_.upper_bound(last_iter_conn_fd_); - while (iterations--) { - if (iter == conns_.end()) iter = conns_.begin(); - if (static_cast(iter->second->GetIdleTime()) >= timeout) { - to_be_killed_conns.emplace_back(iter->first, iter->second->GetID()); - } - iter++; + if (fds.empty()) { + return; + } + + int iterations = std::min(static_cast(conns_.size()), 50); + auto iter = fds.upper_bound(last_iter_conn_fd_); + while (iterations--) { + if (iter == fds.end()) { + iter = fds.begin(); } - iter--; - last_iter_conn_fd_ = iter->first; + if (ConnMap::const_accessor accessor; + conns_.find(accessor, *iter) && static_cast(accessor->second->GetIdleTime()) >= timeout) { + to_be_killed_conns.emplace_back(accessor->first, accessor->second->GetID()); + } + iter++; } + iter--; + last_iter_conn_fd_ = *iter; for (const auto &conn : to_be_killed_conns) { FreeConnectionByID(conn.first, conn.second); @@ -564,6 +569,16 @@ void WorkerThread::Start() { LOG(INFO) << "[worker] Thread #" << t_.get_id() << " started"; } +std::map Worker::GetConnections() const { + std::map result; + for (auto [fd, _] : conns_) { + if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { + result.emplace(accessor->first, accessor->second); + } + } + return result; +} + void WorkerThread::Stop(uint32_t wait_seconds) { worker_->Stop(wait_seconds); } void WorkerThread::Join() { diff --git a/src/server/worker.h b/src/server/worker.h index b6918ba9296..1297220d3d7 100644 --- a/src/server/worker.h +++ b/src/server/worker.h @@ -37,6 +37,7 @@ #include #include "event_util.h" +#include "oneapi/tbb/concurrent_hash_map.h" #include "redis_connection.h" #include "storage/storage.h" @@ -75,10 +76,12 @@ class Worker : EventCallbackBase, EvconnlistenerBase { void TimerCB(int, int16_t events); lua_State *Lua() { return lua_; } - std::map GetConnections() const { return conns_; } + std::map GetConnections() const; Server *srv; private: + using ConnMap = tbb::concurrent_hash_map; + Status listenTCP(const std::string &host, uint32_t port, int backlog); void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen); void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen); @@ -88,9 +91,8 @@ class Worker : EventCallbackBase, EvconnlistenerBase { UniqueEvent timer_; std::thread::id tid_; std::vector listen_events_; - std::mutex conns_mu_; - std::map conns_; - std::map monitor_conns_; + ConnMap conns_; + ConnMap monitor_conns_; int last_iter_conn_fd_ = 0; // fd of last processed connection in previous cron struct bufferevent_rate_limit_group *rate_limit_group_ = nullptr; From f6192d429fde44f9ca209222e76ca658478546ce Mon Sep 17 00:00:00 2001 From: edward_xu Date: Tue, 30 Jul 2024 23:00:12 +0800 Subject: [PATCH 02/16] add mutex for erase and iteration --- src/server/worker.cc | 49 ++++++++++++++++++++++++++++++++------------ src/server/worker.h | 2 ++ 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 7ce09e699e6..4f36b2ffbe5 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -23,6 +23,7 @@ #include #include +#include #include #include @@ -78,17 +79,23 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new()) Worker::~Worker() { // std::vector conns; // conns.reserve(conns_.size() + monitor_conns_.size()); - - for (const auto &iter : conns_) { - if (ConnMap::accessor accessor; conns_.find(accessor, iter.first)) { - // conns.emplace_back(accessor->second); - accessor->second->Close(); + { + std::lock_guard guard(conns_mu_); + for (const auto &iter : conns_) { + if (ConnMap::accessor accessor; conns_.find(accessor, iter.first)) { + // conns.emplace_back(accessor->second); + accessor->second->Close(); + } } } - for (const auto &iter : monitor_conns_) { - if (ConnMap::accessor accessor; monitor_conns_.find(accessor, iter.first)) { - // conns.emplace_back(accessor->second); - accessor->second->Close(); + + { + std::lock_guard guard(conns_mu_); + for (const auto &iter : monitor_conns_) { + if (ConnMap::accessor accessor; monitor_conns_.find(accessor, iter.first)) { + // conns.emplace_back(accessor->second); + accessor->second->Close(); + } } } // for (const auto &iter : conns) { @@ -338,9 +345,12 @@ Status Worker::AddConnection(redis::Connection *c) { redis::Connection *Worker::removeConnection(int fd) { redis::Connection *conn = nullptr; + std::lock_guard guard(conns_mu_); if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { - conn = accessor->second; - conns_.erase(accessor); + { + conn = accessor->second; + conns_.erase(accessor); + } srv->DecrClientNum(); } @@ -411,12 +421,15 @@ void Worker::FreeConnection(redis::Connection *conn) { } void Worker::FreeConnectionByID(int fd, uint64_t id) { + std::lock_guard guard(conns_mu_); if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { if (rate_limit_group_ != nullptr) { bufferevent_remove_from_rate_limit_group(accessor->second->GetBufferEvent()); } + delete accessor->second; conns_.erase(accessor); + srv->DecrClientNum(); } if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) { @@ -448,9 +461,11 @@ Status Worker::Reply(int fd, const std::string &reply) { } void Worker::BecomeMonitorConn(redis::Connection *conn) { + std::lock_guard guard(conns_mu_); if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) { conns_.erase(accessor); accessor.release(); + if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) { accessor->second = conn; } else { @@ -463,8 +478,11 @@ void Worker::BecomeMonitorConn(redis::Connection *conn) { void Worker::QuitMonitorConn(redis::Connection *conn) { if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) { - monitor_conns_.erase(accessor); - accessor.release(); + { + std::lock_guard guard(conns_mu_); + monitor_conns_.erase(accessor); + accessor.release(); + } if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) { accessor->second = conn; } else { @@ -476,6 +494,7 @@ void Worker::QuitMonitorConn(redis::Connection *conn) { } void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) { + std::lock_guard guard(conns_mu_); for (const auto &[key, _] : monitor_conns_) { if (ConnMap::accessor accessor; monitor_conns_.find(accessor, key)) { const auto &value = accessor->second; @@ -489,6 +508,7 @@ void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &respon std::string Worker::GetClientsStr() { std::string output; + std::lock_guard guard(conns_mu_); for (const auto &[key, _] : conns_) { if (ConnMap::const_accessor accessor; conns_.find(accessor, key)) { output.append(accessor->second->ToString()); @@ -500,6 +520,7 @@ std::string Worker::GetClientsStr() { void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string &addr, uint64_t type, bool skipme, int64_t *killed) { + std::lock_guard guard(conns_mu_); for (const auto &[key, _] : conns_) { if (ConnMap::accessor accessor; conns_.find(accessor, key)) { auto conn = accessor->second; @@ -528,6 +549,7 @@ void Worker::KickoutIdleClients(int timeout) { std::vector> to_be_killed_conns; std::set fds; + std::lock_guard guard(conns_mu_); for (const auto &[key, _] : conns_) { fds.emplace(key); } @@ -570,6 +592,7 @@ void WorkerThread::Start() { } std::map Worker::GetConnections() const { + std::unique_lock guard(conns_mu_); std::map result; for (auto [fd, _] : conns_) { if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { diff --git a/src/server/worker.h b/src/server/worker.h index 1297220d3d7..acf1e101bb5 100644 --- a/src/server/worker.h +++ b/src/server/worker.h @@ -91,6 +91,8 @@ class Worker : EventCallbackBase, EvconnlistenerBase { UniqueEvent timer_; std::thread::id tid_; std::vector listen_events_; + mutable std::mutex conns_mu_; // refer to https://github.com/oneapi-src/oneTBB/issues/183, traverse and erase should + // be protected by mutex ConnMap conns_; ConnMap monitor_conns_; int last_iter_conn_fd_ = 0; // fd of last processed connection in previous cron From 4416da72036104eda197777e3ec16a451e393280 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Thu, 1 Aug 2024 20:24:45 +0800 Subject: [PATCH 03/16] remove useless code --- src/server/worker.cc | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 4f36b2ffbe5..ed0b63976b5 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -77,13 +77,10 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new()) } Worker::~Worker() { - // std::vector conns; - // conns.reserve(conns_.size() + monitor_conns_.size()); { std::lock_guard guard(conns_mu_); for (const auto &iter : conns_) { if (ConnMap::accessor accessor; conns_.find(accessor, iter.first)) { - // conns.emplace_back(accessor->second); accessor->second->Close(); } } @@ -93,14 +90,10 @@ Worker::~Worker() { std::lock_guard guard(conns_mu_); for (const auto &iter : monitor_conns_) { if (ConnMap::accessor accessor; monitor_conns_.find(accessor, iter.first)) { - // conns.emplace_back(accessor->second); accessor->second->Close(); } } } - // for (const auto &iter : conns) { - // iter->Close(); - // } timer_.reset(); if (rate_limit_group_) { @@ -477,9 +470,9 @@ void Worker::BecomeMonitorConn(redis::Connection *conn) { } void Worker::QuitMonitorConn(redis::Connection *conn) { + std::lock_guard guard(conns_mu_); if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) { { - std::lock_guard guard(conns_mu_); monitor_conns_.erase(accessor); accessor.release(); } From fe89194ec6811bcb717504b064d5145b77fd26c6 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Sun, 4 Aug 2024 17:19:33 +0800 Subject: [PATCH 04/16] use tbb_parallel function rather than legacy for loop --- src/server/worker.cc | 125 ++++++++++++++++++++++++++++--------------- src/server/worker.h | 7 ++- 2 files changed, 88 insertions(+), 44 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index ed0b63976b5..e1a2512cf38 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -48,6 +48,8 @@ #include #include +#include "oneapi/tbb/parallel_for.h" +#include "oneapi/tbb/parallel_reduce.h" #include "redis_connection.h" #include "redis_request.h" #include "server.h" @@ -77,22 +79,40 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new()) } Worker::~Worker() { - { - std::lock_guard guard(conns_mu_); - for (const auto &iter : conns_) { - if (ConnMap::accessor accessor; conns_.find(accessor, iter.first)) { - accessor->second->Close(); - } - } + auto conns = tbb::parallel_reduce( + conns_.range(), std::vector{}, + [](const ConnMap::range_type &range, std::vector &&result) { + for (auto &it : range) { + result.push_back(it.second); + } + return result; + }, + [](const std::vector &lhs, const std::vector &rhs) { + std::vector result = lhs; + result.insert(result.end(), rhs.begin(), rhs.end()); + return result; + }); + + auto monitor_conns = tbb::parallel_reduce( + monitor_conns_.range(), std::vector{}, + [](const ConnMap::range_type &range, std::vector &&result) { + for (auto &it : range) { + result.push_back(it.second); + } + return result; + }, + [](const std::vector &lhs, const std::vector &rhs) { + std::vector result = lhs; + result.insert(result.end(), rhs.begin(), rhs.end()); + return result; + }); + + for (auto conn : conns) { + conn->Close(); } - { - std::lock_guard guard(conns_mu_); - for (const auto &iter : monitor_conns_) { - if (ConnMap::accessor accessor; monitor_conns_.find(accessor, iter.first)) { - accessor->second->Close(); - } - } + for (auto conn : monitor_conns) { + conn->Close(); } timer_.reset(); @@ -338,7 +358,6 @@ Status Worker::AddConnection(redis::Connection *c) { redis::Connection *Worker::removeConnection(int fd) { redis::Connection *conn = nullptr; - std::lock_guard guard(conns_mu_); if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { { conn = accessor->second; @@ -414,7 +433,6 @@ void Worker::FreeConnection(redis::Connection *conn) { } void Worker::FreeConnectionByID(int fd, uint64_t id) { - std::lock_guard guard(conns_mu_); if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { if (rate_limit_group_ != nullptr) { bufferevent_remove_from_rate_limit_group(accessor->second->GetBufferEvent()); @@ -454,7 +472,6 @@ Status Worker::Reply(int fd, const std::string &reply) { } void Worker::BecomeMonitorConn(redis::Connection *conn) { - std::lock_guard guard(conns_mu_); if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) { conns_.erase(accessor); accessor.release(); @@ -470,7 +487,6 @@ void Worker::BecomeMonitorConn(redis::Connection *conn) { } void Worker::QuitMonitorConn(redis::Connection *conn) { - std::lock_guard guard(conns_mu_); if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) { { monitor_conns_.erase(accessor); @@ -487,34 +503,36 @@ void Worker::QuitMonitorConn(redis::Connection *conn) { } void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) { - std::lock_guard guard(conns_mu_); - for (const auto &[key, _] : monitor_conns_) { - if (ConnMap::accessor accessor; monitor_conns_.find(accessor, key)) { - const auto &value = accessor->second; + tbb::parallel_for(monitor_conns_.range(), [conn, response](const ConnMap::range_type &range) { + for (auto &it : range) { + const auto &value = it.second; if (conn == value) continue; if (conn->GetNamespace() == value->GetNamespace() || value->GetNamespace() == kDefaultNamespace) { value->Reply(response); } } - } + }); } std::string Worker::GetClientsStr() { - std::string output; - std::lock_guard guard(conns_mu_); - for (const auto &[key, _] : conns_) { - if (ConnMap::const_accessor accessor; conns_.find(accessor, key)) { - output.append(accessor->second->ToString()); - } - } - - return output; + return tbb::parallel_reduce( + conns_.range(), std::string{}, + [](const ConnMap::range_type &range, std::string &&result) { + for (auto &it : range) { + result.append(it.second->ToString()); + } + return result; + }, + [](const std::string &lhs, const std::string &rhs) { + std::string result = lhs; + result.append(rhs); + return result; + }); } void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string &addr, uint64_t type, bool skipme, int64_t *killed) { - std::lock_guard guard(conns_mu_); - for (const auto &[key, _] : conns_) { + for (const auto key : getConnFds()) { if (ConnMap::accessor accessor; conns_.find(accessor, key)) { auto conn = accessor->second; if (skipme && self == conn) continue; @@ -542,8 +560,7 @@ void Worker::KickoutIdleClients(int timeout) { std::vector> to_be_killed_conns; std::set fds; - std::lock_guard guard(conns_mu_); - for (const auto &[key, _] : conns_) { + for (const auto key : getConnFds()) { fds.emplace(key); } @@ -571,6 +588,22 @@ void Worker::KickoutIdleClients(int timeout) { } } +std::vector Worker::getConnFds() const { + return tbb::parallel_reduce( + conns_.range(), std::vector{}, + [](const ConnMap::const_range_type &range, std::vector result) { + for (const auto &fd : range) { + result.emplace_back(fd.first); + } + return result; + }, + [](const std::vector &lhs, const std::vector &rhs) { + std::vector result = lhs; + result.insert(result.end(), rhs.begin(), rhs.end()); + return result; + }); +} + void WorkerThread::Start() { auto s = util::CreateThread("worker", [this] { this->worker_->Run(std::this_thread::get_id()); }); @@ -585,13 +618,21 @@ void WorkerThread::Start() { } std::map Worker::GetConnections() const { - std::unique_lock guard(conns_mu_); std::map result; - for (auto [fd, _] : conns_) { - if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { - result.emplace(accessor->first, accessor->second); - } - } + result = tbb::parallel_reduce( + conns_.range(), result, + [](const ConnMap::const_range_type &range, std::map &&tmp_result) { + // std::map tmp_result; + for (auto &it : range) { + tmp_result.emplace(it.first, it.second); + } + return tmp_result; + }, + [](const std::map &lhs, const std::map &rhs) { + std::map result = lhs; + result.insert(rhs.cbegin(), rhs.cend()); + return result; + }); return result; } diff --git a/src/server/worker.h b/src/server/worker.h index acf1e101bb5..3349337da56 100644 --- a/src/server/worker.h +++ b/src/server/worker.h @@ -86,13 +86,16 @@ class Worker : EventCallbackBase, EvconnlistenerBase { void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen); void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen); redis::Connection *removeConnection(int fd); + std::vector getConnFds() const; event_base *base_; UniqueEvent timer_; std::thread::id tid_; std::vector listen_events_; - mutable std::mutex conns_mu_; // refer to https://github.com/oneapi-src/oneTBB/issues/183, traverse and erase should - // be protected by mutex + + // must use tbb::parallel_for or tbb::parallel_reduce to traverse + // refer: + // https://github.com/oneapi-src/oneTBB/blob/v2021.13.0/include/oneapi/tbb/concurrent_hash_map.h#L1033-L1051 ConnMap conns_; ConnMap monitor_conns_; int last_iter_conn_fd_ = 0; // fd of last processed connection in previous cron From 1104241874a3d1ccb02401ab73d71577b5984497 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Sun, 4 Aug 2024 21:12:53 +0800 Subject: [PATCH 05/16] remove useless headers --- src/server/worker.cc | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index e1a2512cf38..6acf7f2a9f6 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -23,7 +23,6 @@ #include #include -#include #include #include @@ -506,7 +505,7 @@ void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &respon tbb::parallel_for(monitor_conns_.range(), [conn, response](const ConnMap::range_type &range) { for (auto &it : range) { const auto &value = it.second; - if (conn == value) continue; + if (conn == value) continue; // skip the monitor command if (conn->GetNamespace() == value->GetNamespace() || value->GetNamespace() == kDefaultNamespace) { value->Reply(response); } @@ -559,15 +558,13 @@ void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string void Worker::KickoutIdleClients(int timeout) { std::vector> to_be_killed_conns; - std::set fds; - for (const auto key : getConnFds()) { - fds.emplace(key); - } - - if (fds.empty()) { + auto fd_list = getConnFds(); + if (fd_list.empty()) { return; } + std::set fds(fd_list.cbegin(), fd_list.cend()); + int iterations = std::min(static_cast(conns_.size()), 50); auto iter = fds.upper_bound(last_iter_conn_fd_); while (iterations--) { From c0d9efbd24779d670103a8e75ec995060deb86c4 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Wed, 7 Aug 2024 23:38:09 +0800 Subject: [PATCH 06/16] refactor duplicated code --- src/server/worker.cc | 43 ++++++++++++++++--------------------------- 1 file changed, 16 insertions(+), 27 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 6acf7f2a9f6..7f4d6327d6c 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -78,33 +78,22 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new()) } Worker::~Worker() { - auto conns = tbb::parallel_reduce( - conns_.range(), std::vector{}, - [](const ConnMap::range_type &range, std::vector &&result) { - for (auto &it : range) { - result.push_back(it.second); - } - return result; - }, - [](const std::vector &lhs, const std::vector &rhs) { - std::vector result = lhs; - result.insert(result.end(), rhs.begin(), rhs.end()); - return result; - }); - - auto monitor_conns = tbb::parallel_reduce( - monitor_conns_.range(), std::vector{}, - [](const ConnMap::range_type &range, std::vector &&result) { - for (auto &it : range) { - result.push_back(it.second); - } - return result; - }, - [](const std::vector &lhs, const std::vector &rhs) { - std::vector result = lhs; - result.insert(result.end(), rhs.begin(), rhs.end()); - return result; - }); + auto collect_conns_fn = [](ConnMap &conns) { + return parallel_reduce( + conns.range(), std::vector{}, + [](const ConnMap::range_type &range, std::vector &&result) { + std::transform(range.begin(), range.end(), std::back_inserter(result), + [](const auto &it) { return it.second; }); + return result; + }, + [](const std::vector &lhs, const std::vector &rhs) { + std::vector result = lhs; + result.insert(result.end(), rhs.begin(), rhs.end()); + return result; + }); + }; + auto conns = collect_conns_fn(conns_); + auto monitor_conns = collect_conns_fn(monitor_conns_); for (auto conn : conns) { conn->Close(); From 9f1b7074c52c8868cf327763e8b83fb5fda8292e Mon Sep 17 00:00:00 2001 From: edward_xu Date: Fri, 9 Aug 2024 09:04:41 +0800 Subject: [PATCH 07/16] remove reduandant concurrency protection in dtor --- src/server/worker.cc | 26 ++++++-------------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 7f4d6327d6c..0e985c9e410 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -78,28 +78,14 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new()) } Worker::~Worker() { - auto collect_conns_fn = [](ConnMap &conns) { - return parallel_reduce( - conns.range(), std::vector{}, - [](const ConnMap::range_type &range, std::vector &&result) { - std::transform(range.begin(), range.end(), std::back_inserter(result), - [](const auto &it) { return it.second; }); - return result; - }, - [](const std::vector &lhs, const std::vector &rhs) { - std::vector result = lhs; - result.insert(result.end(), rhs.begin(), rhs.end()); - return result; - }); - }; - auto conns = collect_conns_fn(conns_); - auto monitor_conns = collect_conns_fn(monitor_conns_); + std::vector conns; + conns.reserve(conns_.size() + monitor_conns_.size()); - for (auto conn : conns) { - conn->Close(); - } + std::transform(conns_.cbegin(), conns_.cend(), std::back_inserter(conns), [](const auto &it) { return it.second; }); + std::transform(monitor_conns_.cbegin(), monitor_conns_.cend(), std::back_inserter(conns), + [](const auto &it) { return it.second; }); - for (auto conn : monitor_conns) { + for (auto conn : conns) { conn->Close(); } From f4bbded9029d85377d6d93194bd45448219e9bca Mon Sep 17 00:00:00 2001 From: edward_xu Date: Fri, 9 Aug 2024 20:17:07 +0800 Subject: [PATCH 08/16] fix sonar cloud analysis issue --- src/server/worker.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 0e985c9e410..3044d710b98 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -81,9 +81,12 @@ Worker::~Worker() { std::vector conns; conns.reserve(conns_.size() + monitor_conns_.size()); - std::transform(conns_.cbegin(), conns_.cend(), std::back_inserter(conns), [](const auto &it) { return it.second; }); - std::transform(monitor_conns_.cbegin(), monitor_conns_.cend(), std::back_inserter(conns), - [](const auto &it) { return it.second; }); + for (const auto &[fd, conn] : conns_) { + conns.emplace_back(conn); + } + for (const auto &[fd, conn] : monitor_conns_) { + conns.emplace_back(conn); + } for (auto conn : conns) { conn->Close(); From 2027b03b5e6f59af79064b4322e9e577f60e9499 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Fri, 9 Aug 2024 22:48:51 +0800 Subject: [PATCH 09/16] make value delete after accessor erased --- src/server/worker.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 3044d710b98..3b5a15096c2 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -38,6 +38,8 @@ #include #endif +#include +#include #include #include #include @@ -47,8 +49,6 @@ #include #include -#include "oneapi/tbb/parallel_for.h" -#include "oneapi/tbb/parallel_reduce.h" #include "redis_connection.h" #include "redis_request.h" #include "server.h" @@ -415,14 +415,14 @@ void Worker::FreeConnectionByID(int fd, uint64_t id) { bufferevent_remove_from_rate_limit_group(accessor->second->GetBufferEvent()); } - delete accessor->second; conns_.erase(accessor); + delete accessor->second; srv->DecrClientNum(); } if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) { - delete accessor->second; monitor_conns_.erase(accessor); + delete accessor->second; srv->DecrClientNum(); srv->DecrMonitorClientNum(); } From 49d67924cb5b285647385b402a4e645c21050b82 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Fri, 9 Aug 2024 22:52:25 +0800 Subject: [PATCH 10/16] make tbb header including use `<>` --- src/server/worker.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/worker.h b/src/server/worker.h index 3349337da56..a6b13391155 100644 --- a/src/server/worker.h +++ b/src/server/worker.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -37,7 +38,6 @@ #include #include "event_util.h" -#include "oneapi/tbb/concurrent_hash_map.h" #include "redis_connection.h" #include "storage/storage.h" From 8011d59a99c76a5d48b3d4f749260649e40ec736 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Sun, 11 Aug 2024 22:23:51 +0800 Subject: [PATCH 11/16] fix `monitor_conns` inserting behavior and refactor useless code. --- src/server/worker.cc | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 3b5a15096c2..07e0a16972f 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -22,6 +22,8 @@ #include #include +#include +#include #include #include @@ -38,8 +40,6 @@ #include #endif -#include -#include #include #include #include @@ -451,13 +451,11 @@ Status Worker::Reply(int fd, const std::string &reply) { void Worker::BecomeMonitorConn(redis::Connection *conn) { if (ConnMap::accessor accessor; conns_.find(accessor, conn->GetFD())) { conns_.erase(accessor); - accessor.release(); - - if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) { - accessor->second = conn; - } else { - monitor_conns_.insert(accessor, std::make_pair(conn->GetFD(), conn)); - } + } + if (ConnMap::accessor accessor; monitor_conns_.find(accessor, conn->GetFD())) { + accessor->second = conn; + } else { + monitor_conns_.insert(accessor, std::make_pair(conn->GetFD(), conn)); } srv->IncrMonitorClientNum(); conn->EnableFlag(redis::Connection::kMonitor); @@ -494,7 +492,7 @@ void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &respon std::string Worker::GetClientsStr() { return tbb::parallel_reduce( conns_.range(), std::string{}, - [](const ConnMap::range_type &range, std::string &&result) { + [](const ConnMap::range_type &range, std::string result) { for (auto &it : range) { result.append(it.second->ToString()); } From 20678a57f740d95a6f2769f506757da1b2b4d668 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Sun, 11 Aug 2024 22:28:45 +0800 Subject: [PATCH 12/16] make deleting item before accessor erasing --- src/server/worker.cc | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 07e0a16972f..e1992dcbcb0 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -415,14 +415,17 @@ void Worker::FreeConnectionByID(int fd, uint64_t id) { bufferevent_remove_from_rate_limit_group(accessor->second->GetBufferEvent()); } - conns_.erase(accessor); + // refer to https://github.com/oneapi-src/oneTBB/blob/v2021.13.0/include/oneapi/tbb/concurrent_hash_map.h#L826 and + // https://github.com/oneapi-src/oneTBB/blob/v2021.13.0/include/oneapi/tbb/concurrent_hash_map.h#L826, erase will + // release the accessor, so we should access the value before erase action. delete accessor->second; + conns_.erase(accessor); srv->DecrClientNum(); } if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) { - monitor_conns_.erase(accessor); delete accessor->second; + monitor_conns_.erase(accessor); srv->DecrClientNum(); srv->DecrMonitorClientNum(); } From 0f3f8bd677205fb5b4f1ca354e8da6b1e9cfb355 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Mon, 12 Aug 2024 08:39:39 +0800 Subject: [PATCH 13/16] fix sonar issue: `Worker::~Worker()` can keep the original code --- src/server/worker.cc | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index e1992dcbcb0..08e741101d3 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -81,15 +81,14 @@ Worker::~Worker() { std::vector conns; conns.reserve(conns_.size() + monitor_conns_.size()); - for (const auto &[fd, conn] : conns_) { - conns.emplace_back(conn); + for (const auto &iter : conns_) { + conns.emplace_back(iter.second); } - for (const auto &[fd, conn] : monitor_conns_) { - conns.emplace_back(conn); + for (const auto &iter : monitor_conns_) { + conns.emplace_back(iter.second); } - - for (auto conn : conns) { - conn->Close(); + for (const auto &iter : conns) { + iter->Close(); } timer_.reset(); From 1665dab3b7723a4633c528965a91fd823070a612 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Thu, 15 Aug 2024 21:16:18 +0800 Subject: [PATCH 14/16] fix typo and remove useless debug comment --- src/server/worker.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 9619e8f3608..f89642ecc13 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -415,7 +415,7 @@ void Worker::FreeConnectionByID(int fd, uint64_t id) { } // refer to https://github.com/oneapi-src/oneTBB/blob/v2021.13.0/include/oneapi/tbb/concurrent_hash_map.h#L826 and - // https://github.com/oneapi-src/oneTBB/blob/v2021.13.0/include/oneapi/tbb/concurrent_hash_map.h#L826, erase will + // https://github.com/oneapi-src/oneTBB/blob/v2021.13.0/include/oneapi/tbb/concurrent_hash_map.h#L1418, erase will // release the accessor, so we should access the value before erase action. delete accessor->second; conns_.erase(accessor); @@ -596,8 +596,7 @@ std::map Worker::GetConnections() const { std::map result; result = tbb::parallel_reduce( conns_.range(), result, - [](const ConnMap::const_range_type &range, std::map &&tmp_result) { - // std::map tmp_result; + [](const ConnMap::const_range_type &range, std::map tmp_result) { for (auto &it : range) { tmp_result.emplace(it.first, it.second); } From 261812d4ab7f46535d1a0c14c58d6db1c3edafd4 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Mon, 2 Sep 2024 21:24:37 +0800 Subject: [PATCH 15/16] add thread constraint for tbb parallel function. --- src/server/worker.cc | 46 +++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index f89642ecc13..bbf9d591c36 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -480,31 +480,37 @@ void Worker::QuitMonitorConn(redis::Connection *conn) { } void Worker::FeedMonitorConns(redis::Connection *conn, const std::string &response) { - tbb::parallel_for(monitor_conns_.range(), [conn, response](const ConnMap::range_type &range) { - for (auto &it : range) { - const auto &value = it.second; - if (conn == value) continue; // skip the monitor command - if (conn->GetNamespace() == value->GetNamespace() || value->GetNamespace() == kDefaultNamespace) { - value->Reply(response); + tbb::task_arena one_thread_arena(tbb::task_arena::constraints{}.set_max_concurrency(1)); + one_thread_arena.execute([this, conn, response]() { + tbb::parallel_for(monitor_conns_.range(), [conn, response](const ConnMap::range_type &range) { + for (auto &it : range) { + const auto &value = it.second; + if (conn == value) continue; // skip the monitor command + if (conn->GetNamespace() == value->GetNamespace() || value->GetNamespace() == kDefaultNamespace) { + value->Reply(response); + } } - } + }); }); } std::string Worker::GetClientsStr() { - return tbb::parallel_reduce( - conns_.range(), std::string{}, - [](const ConnMap::range_type &range, std::string result) { - for (auto &it : range) { - result.append(it.second->ToString()); - } - return result; - }, - [](const std::string &lhs, const std::string &rhs) { - std::string result = lhs; - result.append(rhs); - return result; - }); + tbb::task_arena one_thread_arena(tbb::task_arena::constraints{}.set_max_concurrency(1)); + return one_thread_arena.execute([this]() { + return tbb::parallel_reduce( + conns_.range(), std::string{}, + [](const ConnMap::range_type &range, std::string result) { + for (auto &it : range) { + result.append(it.second->ToString()); + } + return result; + }, + [](const std::string &lhs, const std::string &rhs) { + std::string result = lhs; + result.append(rhs); + return result; + }); + }); } void Worker::KillClient(redis::Connection *self, uint64_t id, const std::string &addr, uint64_t type, bool skipme, From ee88850a192b7baeae30599a60919d2c850bff34 Mon Sep 17 00:00:00 2001 From: edward_xu Date: Tue, 3 Sep 2024 09:08:23 +0800 Subject: [PATCH 16/16] fix free connection issue. --- src/server/worker.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index 4233e28a4e6..155298f8438 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -411,7 +411,7 @@ void Worker::FreeConnection(redis::Connection *conn) { } void Worker::FreeConnectionByID(int fd, uint64_t id) { - if (ConnMap::accessor accessor; conns_.find(accessor, fd)) { + if (ConnMap::accessor accessor; conns_.find(accessor, fd) && accessor->second->GetID() == id) { if (rate_limit_group_ != nullptr) { bufferevent_remove_from_rate_limit_group(accessor->second->GetBufferEvent()); } @@ -424,7 +424,7 @@ void Worker::FreeConnectionByID(int fd, uint64_t id) { srv->DecrClientNum(); } - if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd)) { + if (ConnMap::accessor accessor; monitor_conns_.find(accessor, fd) && accessor->second->GetID() == id) { delete accessor->second; monitor_conns_.erase(accessor); srv->DecrClientNum();