diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc index 7a1a887aacb..e5c8018de82 100644 --- a/src/commands/cmd_zset.cc +++ b/src/commands/cmd_zset.cc @@ -223,39 +223,91 @@ class CommandZLexCount : public Commander { RangeLexSpec spec_; }; -class CommandZPop : public Commander, - private EvbufCallbackBase, - private EventCallbackBase { +class CommandZPop : public Commander { public: - explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {} + explicit CommandZPop(bool min) : min_(min) {} Status Parse(const std::vector &args) override { - if (!block_) { - if (args.size() > 3) { - return {Status::RedisParseErr, errWrongNumOfArguments}; + if (args.size() > 3) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + + if (args.size() == 3) { + auto parse_result = ParseInt(args[2], 10); + if (!parse_result) { + return {Status::RedisParseErr, errValueNotInteger}; } - if (args.size() == 3) { - auto parse_result = ParseInt(args[2], 10); - if (!parse_result) { - return {Status::RedisParseErr, errValueNotInteger}; - } + count_ = *parse_result; + } + return Commander::Parse(args); + } - count_ = *parse_result; - } - keys_.push_back(args[1]); - return Commander::Parse(args); + Status Execute(Server *svr, Connection *conn, std::string *output) override { + redis::ZSet zset_db(svr->storage, conn->GetNamespace()); + std::vector member_scores; + auto s = zset_db.Pop(args_[1], count_, min_, &member_scores); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; } - auto parse_result = ParseInt(args[args.size() - 1], 10); + output->append(redis::MultiLen(member_scores.size() * 2)); + for (const auto &ms : member_scores) { + output->append(redis::BulkString(ms.member)); + output->append(redis::BulkString(util::Float2String(ms.score))); + } + + return Status::OK(); + } + + private: + bool min_; + int count_ = 1; +}; + +class CommandZPopMin : public CommandZPop { + public: + CommandZPopMin() : CommandZPop(true) {} +}; + +class CommandZPopMax : public CommandZPop { + public: + CommandZPopMax() : CommandZPop(false) {} +}; + +static rocksdb::Status PopFromMultipleZsets(std::vector &member_scores, std::string &userkey, Server *svr, + Connection *conn, const std::vector &keys, bool min, + int count) { + redis::ZSet zset_db(svr->storage, conn->GetNamespace()); + rocksdb::Status s; + for (auto &user_key : keys) { + s = zset_db.Pop(user_key, count, min, &member_scores); + if (!s.ok()) { + break; + } + if (member_scores.empty()) { + continue; + } + userkey = user_key; + break; + } + return s; +} + +class CommandBZPop : public Commander, + private EvbufCallbackBase, + private EventCallbackBase { + public: + explicit CommandBZPop(bool min) : min_(min) {} + + Status Parse(const std::vector &args) override { + auto parse_result = ParseInt(args[args.size() - 1], 10); if (!parse_result) { return {Status::RedisParseErr, "timeout is not an integer or out of range"}; } - if (*parse_result < 0) { return {Status::RedisParseErr, errTimeoutIsNegative}; } - timeout_ = *parse_result; keys_ = std::vector(args.begin() + 1, args.end() - 1); @@ -268,12 +320,11 @@ class CommandZPop : public Commander, std::string userkey; std::vector member_scores; - auto s = PopFromMultipleZsets(member_scores, userkey); + auto s = PopFromMultipleZsets(member_scores, userkey, svr, conn, keys_, min_, 1); if (!s.ok()) { return {Status::RedisExecErr, s.ToString()}; } - - if (!block_ || !member_scores.empty()) { + if (!member_scores.empty()) { SendMembersWithScores(member_scores, userkey); return Status::OK(); } @@ -300,31 +351,10 @@ class CommandZPop : public Commander, return {Status::BlockingCmd}; } - rocksdb::Status PopFromMultipleZsets(std::vector &member_scores, std::string &userkey) { - redis::ZSet zset_db(svr_->storage, conn_->GetNamespace()); - rocksdb::Status s; - for (auto &user_key : keys_) { - s = zset_db.Pop(user_key, count_, min_, &member_scores); - if (!s.ok()) { - break; - } - if (member_scores.empty() && block_) { - continue; - } - userkey = user_key; - break; - } - return s; - } - void SendMembersWithScores(const std::vector &member_scores, const std::string &userkey) { std::string output; - if (block_) { - output.append(redis::MultiLen(member_scores.size() * 2 + 1)); - output.append(redis::BulkString(userkey)); - } else { - output.append(redis::MultiLen(member_scores.size() * 2)); - } + output.append(redis::MultiLen(member_scores.size() * 2 + 1)); + output.append(redis::BulkString(userkey)); for (const auto &ms : member_scores) { output.append(redis::BulkString(ms.member)); output.append(redis::BulkString(util::Float2String(ms.score))); @@ -335,7 +365,7 @@ class CommandZPop : public Commander, void OnWrite(bufferevent *bev) { std::string userkey; std::vector member_scores; - auto s = PopFromMultipleZsets(member_scores, userkey); + auto s = PopFromMultipleZsets(member_scores, userkey, svr_, conn_, keys_, min_, 1); if (!s.ok()) { conn_->Reply(redis::Error("ERR " + s.ToString())); return; @@ -384,8 +414,6 @@ class CommandZPop : public Commander, private: bool min_; - bool block_; - int count_ = 1; int timeout_; std::vector keys_; Server *svr_ = nullptr; @@ -399,41 +427,98 @@ class CommandZPop : public Commander, } }; -class CommandZPopMin : public CommandZPop { +class CommandBZPopMin : public CommandBZPop { public: - CommandZPopMin() : CommandZPop(true, false) {} + CommandBZPopMin() : CommandBZPop(true) {} }; -class CommandZPopMax : public CommandZPop { +class CommandBZPopMax : public CommandBZPop { public: - CommandZPopMax() : CommandZPop(false, false) {} + CommandBZPopMax() : CommandBZPop(false) {} }; -class CommandBZPopMin : public CommandZPop { - public: - CommandBZPopMin() : CommandZPop(true, true) {} -}; - -class CommandBZPopMax : public CommandZPop { - public: - CommandBZPopMax() : CommandZPop(false, true) {} -}; +static void SendMembersWithScoresForZMpop(const std::vector &member_scores, const std::string &userkey, + Connection *conn) { + std::string output; + output.append(redis::MultiLen(2)); + output.append(redis::BulkString(userkey)); + output.append(redis::MultiLen(member_scores.size() * 2)); + for (const auto &ms : member_scores) { + output.append(redis::BulkString(ms.member)); + output.append(redis::BulkString(util::Float2String(ms.score))); + } + conn->Reply(output); +} -class CommandMPop : public Commander, - private EvbufCallbackBase, - private EventCallbackBase { +class CommandZMPop : public Commander { public: - explicit CommandMPop(bool block) : block_(block) {} + CommandZMPop() = default; Status Parse(const std::vector &args) override { CommandParser parser(args, 1); - if (block_) { - timeout_ = GET_OR_RET(parser.TakeInt(NumericRange{0, std::numeric_limits::max()})); - if (timeout_ < 0) { - return {Status::RedisParseErr, errTimeoutIsNegative}; + numkeys_ = GET_OR_RET(parser.TakeInt(NumericRange{1, std::numeric_limits::max()})); + for (int i = 0; i < numkeys_; ++i) { + keys_.emplace_back(GET_OR_RET(parser.TakeStr())); + } + + while (parser.Good()) { + if (parser.EatEqICase("min")) { + flag_ = ZSET_MIN; + } else if (parser.EatEqICase("max")) { + flag_ = ZSET_MAX; + } else if (parser.EatEqICase("count")) { + count_ = GET_OR_RET(parser.TakeInt(NumericRange{1, std::numeric_limits::max()})); + } else { + return parser.InvalidSyntax(); + } + } + if (flag_ == ZSET_NONE) { + return parser.InvalidSyntax(); + } + return Commander::Parse(args); + } + + Status Execute(Server *svr, Connection *conn, std::string *output) override { + redis::ZSet zset_db(svr->storage, conn->GetNamespace()); + for (auto &user_key : keys_) { + std::vector member_scores; + auto s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + if (member_scores.empty()) { + continue; } + + SendMembersWithScoresForZMpop(member_scores, user_key, conn); + return Status::OK(); } + *output = redis::NilString(); + return Status::OK(); + } + + static CommandKeyRange Range(const std::vector &args) { + int num_key = *ParseInt(args[1], 10); + return {2, 1 + num_key, 1}; + } + + private: + int numkeys_; + std::vector keys_; + enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE; + int count_ = 1; +}; +class CommandBZMPop : public Commander, + private EvbufCallbackBase, + private EventCallbackBase { + public: + Status Parse(const std::vector &args) override { + CommandParser parser(args, 1); + timeout_ = GET_OR_RET(parser.TakeInt(NumericRange{0, std::numeric_limits::max()})); + if (timeout_ < 0) { + return {Status::RedisParseErr, errTimeoutIsNegative}; + } numkeys_ = GET_OR_RET(parser.TakeInt(NumericRange{1, std::numeric_limits::max()})); for (int i = 0; i < numkeys_; ++i) { keys_.emplace_back(GET_OR_RET(parser.TakeStr())); @@ -462,12 +547,12 @@ class CommandMPop : public Commander, std::string userkey; std::vector member_scores; - auto s = PopFromMultipleZsets(member_scores, userkey); + auto s = PopFromMultipleZsets(member_scores, userkey, svr, conn, keys_, flag_ == ZSET_MIN, count_); if (!s.ok()) { return {Status::RedisExecErr, s.ToString()}; } - if (!block_ || !member_scores.empty()) { - SendMembersWithScores(member_scores, userkey); + if (!member_scores.empty()) { + SendMembersWithScoresForZMpop(member_scores, userkey, conn_); return Status::OK(); } @@ -493,42 +578,10 @@ class CommandMPop : public Commander, return {Status::BlockingCmd}; } - rocksdb::Status PopFromMultipleZsets(std::vector &member_scores, std::string &userkey) { - redis::ZSet zset_db(svr_->storage, conn_->GetNamespace()); - rocksdb::Status s; - for (auto &user_key : keys_) { - s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores); - if (!s.ok()) { - break; - } - if (member_scores.empty()) { - continue; - } - userkey = user_key; - break; - } - return s; - } - - void SendMembersWithScores(const std::vector &member_scores, const std::string &userkey) { - std::string output; - output.append(redis::MultiLen(2)); - output.append(redis::BulkString(userkey)); - output.append(redis::MultiLen(member_scores.size() * 2)); - for (const auto &ms : member_scores) { - output.append(redis::BulkString(ms.member)); - output.append(redis::BulkString(util::Float2String(ms.score))); - } - if (member_scores.empty() && !block_) { - output = redis::NilString(); - } - conn_->Reply(output); - } - void OnWrite(bufferevent *bev) { std::string userkey; std::vector member_scores; - auto s = PopFromMultipleZsets(member_scores, userkey); + auto s = PopFromMultipleZsets(member_scores, userkey, svr_, conn_, keys_, flag_ == ZSET_MIN, count_); if (!s.ok()) { conn_->Reply(redis::Error("ERR " + s.ToString())); return; @@ -541,7 +594,7 @@ class CommandMPop : public Commander, bufferevent_disable(bev, EV_WRITE); return; } - SendMembersWithScores(member_scores, userkey); + SendMembersWithScoresForZMpop(member_scores, userkey, conn_); if (timer_) { timer_.reset(); @@ -586,7 +639,6 @@ class CommandMPop : public Commander, std::vector keys_; enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE; int count_ = 1; - bool block_; Server *svr_ = nullptr; Connection *conn_ = nullptr; UniqueEvent timer_; @@ -598,16 +650,6 @@ class CommandMPop : public Commander, } }; -class CommandZMPop : public CommandMPop { - public: - CommandZMPop() : CommandMPop(false) {} -}; - -class CommandBZMPop : public CommandMPop { - public: - CommandBZMPop() : CommandMPop(true) {} -}; - class CommandZRangeStore : public Commander { public: explicit CommandZRangeStore() : range_type_(kZRangeRank), direction_(kZRangeDirectionForward) {}