Skip to content

Commit

Permalink
split function and extract same code as function
Browse files Browse the repository at this point in the history
  • Loading branch information
Yangsx-1 committed Jun 16, 2023
1 parent 7cebbd5 commit 6805353
Showing 1 changed file with 159 additions and 117 deletions.
276 changes: 159 additions & 117 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,39 +223,91 @@ class CommandZLexCount : public Commander {
RangeLexSpec spec_;
};

class CommandZPop : public Commander,
private EvbufCallbackBase<CommandZPop, false>,
private EventCallbackBase<CommandZPop> {
class CommandZPop : public Commander {
public:
explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
explicit CommandZPop(bool min) : min_(min) {}

Status Parse(const std::vector<std::string> &args) override {
if (!block_) {
if (args.size() > 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
if (args.size() > 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

if (args.size() == 3) {
auto parse_result = ParseInt<int>(args[2], 10);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
}

if (args.size() == 3) {
auto parse_result = ParseInt<int>(args[2], 10);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
}
count_ = *parse_result;
}
return Commander::Parse(args);
}

count_ = *parse_result;
}
keys_.push_back(args[1]);
return Commander::Parse(args);
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<MemberScore> member_scores;
auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
auto parse_result = ParseInt<int>(args[args.size() - 1], 10);

output->append(redis::MultiLen(member_scores.size() * 2));
for (const auto &ms : member_scores) {
output->append(redis::BulkString(ms.member));
output->append(redis::BulkString(util::Float2String(ms.score)));
}

return Status::OK();
}

private:
bool min_;
int count_ = 1;
};

class CommandZPopMin : public CommandZPop {
public:
CommandZPopMin() : CommandZPop(true) {}
};

class CommandZPopMax : public CommandZPop {
public:
CommandZPopMax() : CommandZPop(false) {}
};

static rocksdb::Status PopFromMultipleZsets(std::vector<MemberScore> &member_scores, std::string &userkey, Server *svr,
Connection *conn, const std::vector<std::string> &keys, bool min,
int count) {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
rocksdb::Status s;
for (auto &user_key : keys) {
s = zset_db.Pop(user_key, count, min, &member_scores);
if (!s.ok()) {
break;
}
if (member_scores.empty()) {
continue;
}
userkey = user_key;
break;
}
return s;
}

class CommandBZPop : public Commander,
private EvbufCallbackBase<CommandBZPop, false>,
private EventCallbackBase<CommandBZPop> {
public:
explicit CommandBZPop(bool min) : min_(min) {}

Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
if (!parse_result) {
return {Status::RedisParseErr, "timeout is not an integer or out of range"};
}

if (*parse_result < 0) {
return {Status::RedisParseErr, errTimeoutIsNegative};
}

timeout_ = *parse_result;

keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
Expand All @@ -268,12 +320,11 @@ class CommandZPop : public Commander,

std::string userkey;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey);
auto s = PopFromMultipleZsets(member_scores, userkey, svr, conn, keys_, min_, 1);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (!block_ || !member_scores.empty()) {
if (!member_scores.empty()) {
SendMembersWithScores(member_scores, userkey);
return Status::OK();
}
Expand All @@ -300,31 +351,10 @@ class CommandZPop : public Commander,
return {Status::BlockingCmd};
}

rocksdb::Status PopFromMultipleZsets(std::vector<MemberScore> &member_scores, std::string &userkey) {
redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
rocksdb::Status s;
for (auto &user_key : keys_) {
s = zset_db.Pop(user_key, count_, min_, &member_scores);
if (!s.ok()) {
break;
}
if (member_scores.empty() && block_) {
continue;
}
userkey = user_key;
break;
}
return s;
}

void SendMembersWithScores(const std::vector<MemberScore> &member_scores, const std::string &userkey) {
std::string output;
if (block_) {
output.append(redis::MultiLen(member_scores.size() * 2 + 1));
output.append(redis::BulkString(userkey));
} else {
output.append(redis::MultiLen(member_scores.size() * 2));
}
output.append(redis::MultiLen(member_scores.size() * 2 + 1));
output.append(redis::BulkString(userkey));
for (const auto &ms : member_scores) {
output.append(redis::BulkString(ms.member));
output.append(redis::BulkString(util::Float2String(ms.score)));
Expand All @@ -335,7 +365,7 @@ class CommandZPop : public Commander,
void OnWrite(bufferevent *bev) {
std::string userkey;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey);
auto s = PopFromMultipleZsets(member_scores, userkey, svr_, conn_, keys_, min_, 1);
if (!s.ok()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
return;
Expand Down Expand Up @@ -384,8 +414,6 @@ class CommandZPop : public Commander,

private:
bool min_;
bool block_;
int count_ = 1;
int timeout_;
std::vector<std::string> keys_;
Server *svr_ = nullptr;
Expand All @@ -399,41 +427,98 @@ class CommandZPop : public Commander,
}
};

class CommandZPopMin : public CommandZPop {
class CommandBZPopMin : public CommandBZPop {
public:
CommandZPopMin() : CommandZPop(true, false) {}
CommandBZPopMin() : CommandBZPop(true) {}
};

class CommandZPopMax : public CommandZPop {
class CommandBZPopMax : public CommandBZPop {
public:
CommandZPopMax() : CommandZPop(false, false) {}
CommandBZPopMax() : CommandBZPop(false) {}
};

class CommandBZPopMin : public CommandZPop {
public:
CommandBZPopMin() : CommandZPop(true, true) {}
};

class CommandBZPopMax : public CommandZPop {
public:
CommandBZPopMax() : CommandZPop(false, true) {}
};
static void SendMembersWithScoresForZMpop(const std::vector<MemberScore> &member_scores, const std::string &userkey,
Connection *conn) {
std::string output;
output.append(redis::MultiLen(2));
output.append(redis::BulkString(userkey));
output.append(redis::MultiLen(member_scores.size() * 2));
for (const auto &ms : member_scores) {
output.append(redis::BulkString(ms.member));
output.append(redis::BulkString(util::Float2String(ms.score)));
}
conn->Reply(output);
}

class CommandMPop : public Commander,
private EvbufCallbackBase<CommandMPop, false>,
private EventCallbackBase<CommandMPop> {
class CommandZMPop : public Commander {
public:
explicit CommandMPop(bool block) : block_(block) {}
CommandZMPop() = default;

Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
if (block_) {
timeout_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{0, std::numeric_limits<int>::max()}));
if (timeout_ < 0) {
return {Status::RedisParseErr, errTimeoutIsNegative};
numkeys_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{1, std::numeric_limits<int>::max()}));
for (int i = 0; i < numkeys_; ++i) {
keys_.emplace_back(GET_OR_RET(parser.TakeStr()));
}

while (parser.Good()) {
if (parser.EatEqICase("min")) {
flag_ = ZSET_MIN;
} else if (parser.EatEqICase("max")) {
flag_ = ZSET_MAX;
} else if (parser.EatEqICase("count")) {
count_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{1, std::numeric_limits<int>::max()}));
} else {
return parser.InvalidSyntax();
}
}
if (flag_ == ZSET_NONE) {
return parser.InvalidSyntax();
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
for (auto &user_key : keys_) {
std::vector<MemberScore> member_scores;
auto s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
if (member_scores.empty()) {
continue;
}

SendMembersWithScoresForZMpop(member_scores, user_key, conn);
return Status::OK();
}
*output = redis::NilString();
return Status::OK();
}

static CommandKeyRange Range(const std::vector<std::string> &args) {
int num_key = *ParseInt<int>(args[1], 10);
return {2, 1 + num_key, 1};
}

private:
int numkeys_;
std::vector<std::string> keys_;
enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE;
int count_ = 1;
};

class CommandBZMPop : public Commander,
private EvbufCallbackBase<CommandBZMPop, false>,
private EventCallbackBase<CommandBZMPop> {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
timeout_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{0, std::numeric_limits<int>::max()}));
if (timeout_ < 0) {
return {Status::RedisParseErr, errTimeoutIsNegative};
}
numkeys_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{1, std::numeric_limits<int>::max()}));
for (int i = 0; i < numkeys_; ++i) {
keys_.emplace_back(GET_OR_RET(parser.TakeStr()));
Expand Down Expand Up @@ -462,12 +547,12 @@ class CommandMPop : public Commander,

std::string userkey;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey);
auto s = PopFromMultipleZsets(member_scores, userkey, svr, conn, keys_, flag_ == ZSET_MIN, count_);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
if (!block_ || !member_scores.empty()) {
SendMembersWithScores(member_scores, userkey);
if (!member_scores.empty()) {
SendMembersWithScoresForZMpop(member_scores, userkey, conn_);
return Status::OK();
}

Expand All @@ -493,42 +578,10 @@ class CommandMPop : public Commander,
return {Status::BlockingCmd};
}

rocksdb::Status PopFromMultipleZsets(std::vector<MemberScore> &member_scores, std::string &userkey) {
redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
rocksdb::Status s;
for (auto &user_key : keys_) {
s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
if (!s.ok()) {
break;
}
if (member_scores.empty()) {
continue;
}
userkey = user_key;
break;
}
return s;
}

void SendMembersWithScores(const std::vector<MemberScore> &member_scores, const std::string &userkey) {
std::string output;
output.append(redis::MultiLen(2));
output.append(redis::BulkString(userkey));
output.append(redis::MultiLen(member_scores.size() * 2));
for (const auto &ms : member_scores) {
output.append(redis::BulkString(ms.member));
output.append(redis::BulkString(util::Float2String(ms.score)));
}
if (member_scores.empty() && !block_) {
output = redis::NilString();
}
conn_->Reply(output);
}

void OnWrite(bufferevent *bev) {
std::string userkey;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey);
auto s = PopFromMultipleZsets(member_scores, userkey, svr_, conn_, keys_, flag_ == ZSET_MIN, count_);
if (!s.ok()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
return;
Expand All @@ -541,7 +594,7 @@ class CommandMPop : public Commander,
bufferevent_disable(bev, EV_WRITE);
return;
}
SendMembersWithScores(member_scores, userkey);
SendMembersWithScoresForZMpop(member_scores, userkey, conn_);

if (timer_) {
timer_.reset();
Expand Down Expand Up @@ -586,7 +639,6 @@ class CommandMPop : public Commander,
std::vector<std::string> keys_;
enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE;
int count_ = 1;
bool block_;
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
UniqueEvent timer_;
Expand All @@ -598,16 +650,6 @@ class CommandMPop : public Commander,
}
};

class CommandZMPop : public CommandMPop {
public:
CommandZMPop() : CommandMPop(false) {}
};

class CommandBZMPop : public CommandMPop {
public:
CommandBZMPop() : CommandMPop(true) {}
};

class CommandZRangeStore : public Commander {
public:
explicit CommandZRangeStore() : range_type_(kZRangeRank), direction_(kZRangeDirectionForward) {}
Expand Down

0 comments on commit 6805353

Please sign in to comment.