From 93048fa73e40773783d41252596d84e9009c53b6 Mon Sep 17 00:00:00 2001 From: Yaroslav Stepanchuk Date: Sat, 17 Jun 2023 19:32:14 +0300 Subject: [PATCH 1/2] Refactor the code related to BZMPOP --- src/commands/cmd_zset.cc | 129 ++++++++++++++++++++++----------------- src/common/status.h | 2 +- 2 files changed, 74 insertions(+), 57 deletions(-) diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc index e5c8018de82..42c9553d873 100644 --- a/src/commands/cmd_zset.cc +++ b/src/commands/cmd_zset.cc @@ -275,23 +275,22 @@ class CommandZPopMax : public CommandZPop { CommandZPopMax() : CommandZPop(false) {} }; -static rocksdb::Status PopFromMultipleZsets(std::vector &member_scores, std::string &userkey, Server *svr, - Connection *conn, const std::vector &keys, bool min, - int count) { - redis::ZSet zset_db(svr->storage, conn->GetNamespace()); +static rocksdb::Status PopFromMultipleZsets(redis::ZSet *zset_db, const std::vector &keys, bool min, + int count, std::string *user_key, std::vector *member_scores) { rocksdb::Status s; - for (auto &user_key : keys) { - s = zset_db.Pop(user_key, count, min, &member_scores); + for (auto &key : keys) { + s = zset_db->Pop(key, count, min, member_scores); if (!s.ok()) { - break; + return s; } - if (member_scores.empty()) { - continue; + + if (!member_scores->empty()) { + *user_key = key; + break; } - userkey = user_key; - break; } - return s; + + return rocksdb::Status::OK(); } class CommandBZPop : public Commander, @@ -318,21 +317,24 @@ class CommandBZPop : public Commander, svr_ = svr; conn_ = conn; - std::string userkey; + std::string user_key; std::vector member_scores; - auto s = PopFromMultipleZsets(member_scores, userkey, svr, conn, keys_, min_, 1); + + redis::ZSet zset_db(svr->storage, conn->GetNamespace()); + auto s = PopFromMultipleZsets(&zset_db, keys_, min_, 1, &user_key, &member_scores); if (!s.ok()) { return {Status::RedisExecErr, s.ToString()}; } + if (!member_scores.empty()) { - SendMembersWithScores(member_scores, userkey); + SendMembersWithScores(member_scores, user_key); return Status::OK(); } - // All Empty + // all sorted sets are empty if (conn->IsInExec()) { *output = redis::MultiLen(-1); - return Status::OK(); // No blocking in multi-exec + return Status::OK(); // no blocking in multi-exec } for (const auto &key : keys_) { @@ -351,10 +353,10 @@ class CommandBZPop : public Commander, return {Status::BlockingCmd}; } - void SendMembersWithScores(const std::vector &member_scores, const std::string &userkey) { + void SendMembersWithScores(const std::vector &member_scores, const std::string &user_key) { std::string output; output.append(redis::MultiLen(member_scores.size() * 2 + 1)); - output.append(redis::BulkString(userkey)); + output.append(redis::BulkString(user_key)); for (const auto &ms : member_scores) { output.append(redis::BulkString(ms.member)); output.append(redis::BulkString(util::Float2String(ms.score))); @@ -363,28 +365,32 @@ class CommandBZPop : public Commander, } void OnWrite(bufferevent *bev) { - std::string userkey; + std::string user_key; std::vector member_scores; - auto s = PopFromMultipleZsets(member_scores, userkey, svr_, conn_, keys_, min_, 1); + + redis::ZSet zset_db(svr_->storage, conn_->GetNamespace()); + auto s = PopFromMultipleZsets(&zset_db, keys_, min_, 1, &user_key, &member_scores); if (!s.ok()) { conn_->Reply(redis::Error("ERR " + s.ToString())); return; } + if (member_scores.empty()) { - // The connection may be waked up but can't pop from list. For example, - // connection A is blocking on list and connection B push a new element - // then wake up the connection A, but this element may be token by other connection C. - // So we need to wait for the wake event again by disabling the WRITE event. + // The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and + // connection B added a new element; then connection A was unblocked, but this element may be taken by + // another connection C. So we need to block connection A again and wait for the element being added + // by disabling the WRITE event. bufferevent_disable(bev, EV_WRITE); return; } - SendMembersWithScores(member_scores, userkey); + + SendMembersWithScores(member_scores, user_key); if (timer_) { timer_.reset(); } - unBlockingAll(); + unblockOnAllKeys(); conn_->SetCB(bev); bufferevent_enable(bev, EV_READ); // We need to manually trigger the read event since we will stop processing commands @@ -398,15 +404,15 @@ class CommandBZPop : public Commander, if (timer_ != nullptr) { timer_.reset(); } - unBlockingAll(); + unblockOnAllKeys(); } conn_->OnEvent(bev, events); } - void TimerCB(int, int16_t events) { - conn_->Reply(redis::NilString()); + void TimerCB(int, int16_t) { + conn_->Reply(redis::MultiLen(-1)); timer_.reset(); - unBlockingAll(); + unblockOnAllKeys(); auto bev = conn_->GetBufferEvent(); conn_->SetCB(bev); bufferevent_enable(bev, EV_READ); @@ -420,7 +426,7 @@ class CommandBZPop : public Commander, Connection *conn_ = nullptr; UniqueEvent timer_; - void unBlockingAll() { + void unblockOnAllKeys() { for (const auto &key : keys_) { svr_->UnblockOnKey(key, conn_); } @@ -437,11 +443,11 @@ class CommandBZPopMax : public CommandBZPop { CommandBZPopMax() : CommandBZPop(false) {} }; -static void SendMembersWithScoresForZMpop(const std::vector &member_scores, const std::string &userkey, - Connection *conn) { +static void SendMembersWithScoresForZMpop(Connection *conn, const std::string &user_key, + const std::vector &member_scores) { std::string output; output.append(redis::MultiLen(2)); - output.append(redis::BulkString(userkey)); + output.append(redis::BulkString(user_key)); output.append(redis::MultiLen(member_scores.size() * 2)); for (const auto &ms : member_scores) { output.append(redis::BulkString(ms.member)); @@ -490,10 +496,10 @@ class CommandZMPop : public Commander { continue; } - SendMembersWithScoresForZMpop(member_scores, user_key, conn); + SendMembersWithScoresForZMpop(conn, user_key, member_scores); return Status::OK(); } - *output = redis::NilString(); + *output = redis::MultiLen(-1); return Status::OK(); } @@ -515,12 +521,14 @@ class CommandBZMPop : public Commander, public: Status Parse(const std::vector &args) override { CommandParser parser(args, 1); + timeout_ = GET_OR_RET(parser.TakeInt(NumericRange{0, std::numeric_limits::max()})); if (timeout_ < 0) { return {Status::RedisParseErr, errTimeoutIsNegative}; } - numkeys_ = GET_OR_RET(parser.TakeInt(NumericRange{1, std::numeric_limits::max()})); - for (int i = 0; i < numkeys_; ++i) { + + num_keys_ = GET_OR_RET(parser.TakeInt(NumericRange{1, std::numeric_limits::max()})); + for (int i = 0; i < num_keys_; ++i) { keys_.emplace_back(GET_OR_RET(parser.TakeStr())); } @@ -535,9 +543,11 @@ class CommandBZMPop : public Commander, return parser.InvalidSyntax(); } } + if (flag_ == ZSET_NONE) { return parser.InvalidSyntax(); } + return Commander::Parse(args); } @@ -545,21 +555,24 @@ class CommandBZMPop : public Commander, svr_ = svr; conn_ = conn; - std::string userkey; + std::string user_key; std::vector member_scores; - auto s = PopFromMultipleZsets(member_scores, userkey, svr, conn, keys_, flag_ == ZSET_MIN, count_); + + redis::ZSet zset_db(svr->storage, conn->GetNamespace()); + auto s = PopFromMultipleZsets(&zset_db, keys_, flag_ == ZSET_MIN, count_, &user_key, &member_scores); if (!s.ok()) { return {Status::RedisExecErr, s.ToString()}; } + if (!member_scores.empty()) { - SendMembersWithScoresForZMpop(member_scores, userkey, conn_); + SendMembersWithScoresForZMpop(conn_, user_key, member_scores); return Status::OK(); } - // All Empty + // all sorted sets are empty if (conn->IsInExec()) { *output = redis::MultiLen(-1); - return Status::OK(); // No blocking in multi-exec + return Status::OK(); // no blocking in multi-exec } for (const auto &key : keys_) { @@ -579,28 +592,32 @@ class CommandBZMPop : public Commander, } void OnWrite(bufferevent *bev) { - std::string userkey; + std::string user_key; std::vector member_scores; - auto s = PopFromMultipleZsets(member_scores, userkey, svr_, conn_, keys_, flag_ == ZSET_MIN, count_); + + redis::ZSet zset_db(svr_->storage, conn_->GetNamespace()); + auto s = PopFromMultipleZsets(&zset_db, keys_, flag_ == ZSET_MIN, count_, &user_key, &member_scores); if (!s.ok()) { conn_->Reply(redis::Error("ERR " + s.ToString())); return; } + if (member_scores.empty()) { - // The connection may be waked up but can't pop from list. For example, - // connection A is blocking on list and connection B push a new element - // then wake up the connection A, but this element may be token by other connection C. - // So we need to wait for the wake event again by disabling the WRITE event. + // The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and + // connection B added a new element; then connection A was unblocked, but this element may be taken by + // another connection C. So we need to block connection A again and wait for the element being added + // by disabling the WRITE event. bufferevent_disable(bev, EV_WRITE); return; } - SendMembersWithScoresForZMpop(member_scores, userkey, conn_); + + SendMembersWithScoresForZMpop(conn_, user_key, member_scores); if (timer_) { timer_.reset(); } - unBlockingAll(); + unblockOnAllKeys(); conn_->SetCB(bev); bufferevent_enable(bev, EV_READ); // We need to manually trigger the read event since we will stop processing commands @@ -614,7 +631,7 @@ class CommandBZMPop : public Commander, if (timer_ != nullptr) { timer_.reset(); } - unBlockingAll(); + unblockOnAllKeys(); } conn_->OnEvent(bev, events); } @@ -622,7 +639,7 @@ class CommandBZMPop : public Commander, void TimerCB(int, int16_t events) { conn_->Reply(redis::NilString()); timer_.reset(); - unBlockingAll(); + unblockOnAllKeys(); auto bev = conn_->GetBufferEvent(); conn_->SetCB(bev); bufferevent_enable(bev, EV_READ); @@ -635,7 +652,7 @@ class CommandBZMPop : public Commander, private: int timeout_ = 0; // seconds - int numkeys_; + int num_keys_; std::vector keys_; enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE; int count_ = 1; @@ -643,7 +660,7 @@ class CommandBZMPop : public Commander, Connection *conn_ = nullptr; UniqueEvent timer_; - void unBlockingAll() { + void unblockOnAllKeys() { for (const auto &key : keys_) { svr_->UnblockOnKey(key, conn_); } diff --git a/src/common/status.h b/src/common/status.h index 55cfcef1f9d..496447e56cd 100644 --- a/src/common/status.h +++ b/src/common/status.h @@ -174,7 +174,7 @@ struct StringInStatusOr> : StringInStatusOr(StringInStatusOr&& v) : BaseType(new std::string(*std::move(v))) {} // NOLINT template ::inplace, int> = 0> StringInStatusOr(StringInStatusOr&& v) // NOLINT - : BaseType((typename StringInStatusOr::BaseType &&)(std::move(v))) {} + : BaseType((typename StringInStatusOr::BaseType&&)(std::move(v))) {} StringInStatusOr(const StringInStatusOr& v) = delete; From a3acc7a6a6fcda15e7dd03c81a34a2225c7318c6 Mon Sep 17 00:00:00 2001 From: Yaroslav Stepanchuk Date: Sat, 17 Jun 2023 19:49:34 +0300 Subject: [PATCH 2/2] Fix lint --- src/common/status.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/status.h b/src/common/status.h index 496447e56cd..55cfcef1f9d 100644 --- a/src/common/status.h +++ b/src/common/status.h @@ -174,7 +174,7 @@ struct StringInStatusOr> : StringInStatusOr(StringInStatusOr&& v) : BaseType(new std::string(*std::move(v))) {} // NOLINT template ::inplace, int> = 0> StringInStatusOr(StringInStatusOr&& v) // NOLINT - : BaseType((typename StringInStatusOr::BaseType&&)(std::move(v))) {} + : BaseType((typename StringInStatusOr::BaseType &&)(std::move(v))) {} StringInStatusOr(const StringInStatusOr& v) = delete;