diff --git a/src/server/server.cc b/src/server/server.cc index 5e20984dc3a..a8ba3e6324d 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -20,7 +20,6 @@ #include "server.h" -#include #include #include #include @@ -41,14 +40,12 @@ #include "fmt/format.h" #include "lua.h" #include "redis_connection.h" -#include "redis_request.h" #include "storage/compaction_checker.h" #include "storage/redis_db.h" #include "storage/scripting.h" #include "string_util.h" #include "thread_util.h" #include "time_util.h" -#include "tls_util.h" #include "version.h" #include "worker.h" @@ -529,7 +526,7 @@ void Server::UnblockOnKey(const std::string &key, redis::Connection *conn) { void Server::BlockOnStreams(const std::vector &keys, const std::vector &entry_ids, redis::Connection *conn) { - std::lock_guard guard(blocking_keys_mu_); + std::lock_guard guard(blocked_stream_consumers_mu_); IncrBlockedClientNum(); @@ -546,7 +543,7 @@ void Server::BlockOnStreams(const std::vector &keys, const std::vec } void Server::UnblockOnStreams(const std::vector &keys, redis::Connection *conn) { - std::lock_guard guard(blocking_keys_mu_); + std::lock_guard guard(blocked_stream_consumers_mu_); DecrBlockedClientNum(); @@ -590,7 +587,7 @@ void Server::WakeupBlockingConns(const std::string &key, size_t n_conns) { } void Server::OnEntryAddedToStream(const std::string &ns, const std::string &key, const redis::StreamEntryID &entry_id) { - std::lock_guard guard(blocking_keys_mu_); + std::lock_guard guard(blocked_stream_consumers_mu_); auto iter = blocked_stream_consumers_.find(key); if (iter == blocked_stream_consumers_.end() || iter->second.empty()) { diff --git a/src/server/server.h b/src/server/server.h index ee31df5ef94..1f56d64458e 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -287,7 +287,10 @@ class Server { std::mutex pubsub_channels_mu_; std::map> blocking_keys_; std::mutex blocking_keys_mu_; + std::atomic blocked_clients_{0}; + + std::mutex blocked_stream_consumers_mu_; std::map>> blocked_stream_consumers_; // threads