Skip to content

Commit

Permalink
refactor to avoid repeated code
Browse files Browse the repository at this point in the history
  • Loading branch information
Yangsx-1 committed Jun 11, 2023
1 parent 60949bf commit e85da52
Showing 1 changed file with 80 additions and 139 deletions.
219 changes: 80 additions & 139 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,85 +222,55 @@ class CommandZLexCount : public Commander {
RangeLexSpec spec_;
};

class CommandZPop : public Commander {
class CommandZPop : public Commander,
private EvbufCallbackBase<CommandZPop, false>,
private EventCallbackBase<CommandZPop> {
public:
explicit CommandZPop(bool min) : min_(min) {}
explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}

Status Parse(const std::vector<std::string> &args) override {
if (args.size() > 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

if (args.size() == 3) {
auto parse_result = ParseInt<int>(args[2], 10);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
if (!block_) {
if (args.size() > 3) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}

count_ = *parse_result;
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<MemberScore> member_scores;
auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

output->append(redis::MultiLen(member_scores.size() * 2));
for (const auto &ms : member_scores) {
output->append(redis::BulkString(ms.member));
output->append(redis::BulkString(util::Float2String(ms.score)));
}

return Status::OK();
}

private:
bool min_;
int count_ = 1;
};

class CommandZPopMin : public CommandZPop {
public:
CommandZPopMin() : CommandZPop(true) {}
};
if (args.size() == 3) {
auto parse_result = ParseInt<int>(args[2], 10);
if (!parse_result) {
return {Status::RedisParseErr, errValueNotInteger};
}

class CommandZPopMax : public CommandZPop {
public:
CommandZPopMax() : CommandZPop(false) {}
};
count_ = *parse_result;
}
keys_.push_back(args[1]);
return Commander::Parse(args);
} else {
auto parse_result = ParseInt<int>(args[args.size() - 1], 10);

class CommandBZPop : public Commander,
private EvbufCallbackBase<CommandBZPop, false>,
private EventCallbackBase<CommandBZPop> {
public:
explicit CommandBZPop(bool min) : min_(min) {}
if (!parse_result) {
return {Status::RedisParseErr, "timeout is not an integer or out of range"};
}

Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
if (*parse_result < 0) {
return {Status::RedisParseErr, "timeout should not be negative"};
}

if (!parse_result) {
return {Status::RedisParseErr, "timeout is not an integer or out of range"};
}
timeout_ = *parse_result;

if (*parse_result < 0) {
return {Status::RedisParseErr, "timeout should not be negative"};
keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
return Commander::Parse(args);
}

timeout_ = *parse_result;

keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
conn_ = conn;

if (!block_) {
auto s = TryPopFromZset();
return Status::OK();
}

auto bev = conn->GetBufferEvent();

auto s = TryPopFromZset();
Expand Down Expand Up @@ -340,12 +310,16 @@ class CommandBZPop : public Commander,
conn_->Reply(redis::Error("ERR " + s.ToString()));
break;
}
if (member_scores.empty()) {
if (member_scores.empty() && block_) {
continue;
}
std::string output;
output.append(redis::MultiLen(member_scores.size() * 2 + 1));
output.append(redis::BulkString(user_key));
if (!block_) {
output.append(redis::MultiLen(member_scores.size() * 2));
} else {
output.append(redis::MultiLen(member_scores.size() * 2 + 1));
output.append(redis::BulkString(user_key));
}
for (const auto &ms : member_scores) {
output.append(redis::BulkString(ms.member));
output.append(redis::BulkString(util::Float2String(ms.score)));
Expand Down Expand Up @@ -402,6 +376,7 @@ class CommandBZPop : public Commander,

private:
bool min_;
bool block_;
int count_ = 1;
int timeout_;
std::vector<std::string> keys_;
Expand All @@ -417,92 +392,39 @@ class CommandBZPop : public Commander,
}
};

class CommandBZPopMin : public CommandBZPop {
class CommandZPopMin : public CommandZPop {
public:
CommandBZPopMin() : CommandBZPop(true) {}
CommandZPopMin() : CommandZPop(true, false) {}
};

class CommandBZPopMax : public CommandBZPop {
class CommandZPopMax : public CommandZPop {
public:
CommandBZPopMax() : CommandBZPop(false) {}
CommandZPopMax() : CommandZPop(false, false) {}
};

class CommandZMPop : public Commander {
class CommandBZPopMin : public CommandZPop {
public:
CommandZMPop() = default;

Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
numkeys_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{1, std::numeric_limits<int>::max()}));
for (int i = 0; i < numkeys_; ++i) {
keys_.emplace_back(GET_OR_RET(parser.TakeStr()));
}

while (parser.Good()) {
if (parser.EatEqICase("min")) {
flag_ = ZSET_MIN;
} else if (parser.EatEqICase("max")) {
flag_ = ZSET_MAX;
} else if (parser.EatEqICase("count")) {
count_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{1, std::numeric_limits<int>::max()}));
} else {
return parser.InvalidSyntax();
}
}
if (flag_ == ZSET_NONE) {
return parser.InvalidSyntax();
}
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
for (auto &user_key : keys_) {
std::vector<MemberScore> member_scores;
auto s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
if (member_scores.empty()) {
continue;
}

output->append(redis::MultiLen(2));
output->append(redis::BulkString(user_key));
output->append(redis::MultiLen(member_scores.size() * 2));
for (const auto &ms : member_scores) {
output->append(redis::BulkString(ms.member));
output->append(redis::BulkString(util::Float2String(ms.score)));
}
return Status::OK();
}
*output = redis::NilString();
return Status::OK();
}

static CommandKeyRange Range(const std::vector<std::string> &args) {
int num_key = *ParseInt<int>(args[1], 10);
return {2, 1 + num_key, 1};
}
CommandBZPopMin() : CommandZPop(true, true) {}
};

private:
int numkeys_;
std::vector<std::string> keys_;
enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE;
int count_ = 1;
class CommandBZPopMax : public CommandZPop {
public:
CommandBZPopMax() : CommandZPop(false, true) {}
};

class CommandBZMPop : public Commander,
private EvbufCallbackBase<CommandBZMPop, false>,
private EventCallbackBase<CommandBZMPop> {
class CommandMPop : public Commander,
private EvbufCallbackBase<CommandMPop, false>,
private EventCallbackBase<CommandMPop> {
public:
CommandBZMPop() = default;
explicit CommandMPop(bool block) : block_(block) {}

Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
timeout_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{0, std::numeric_limits<int>::max()}));
if (timeout_ < 0) {
return {Status::RedisParseErr, "timeout should not be negative"};
if (block_) {
timeout_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{0, std::numeric_limits<int>::max()}));
if (timeout_ < 0) {
return {Status::RedisParseErr, "timeout should not be negative"};
}
}

numkeys_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{1, std::numeric_limits<int>::max()}));
Expand Down Expand Up @@ -531,6 +453,11 @@ class CommandBZMPop : public Commander,
svr_ = svr;
conn_ = conn;

if (!block_) {
auto s = TryPopFromZset();
return Status::OK();
}

auto bev = conn->GetBufferEvent();

auto s = TryPopFromZset();
Expand Down Expand Up @@ -563,6 +490,7 @@ class CommandBZMPop : public Commander,
rocksdb::Status TryPopFromZset() {
redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
rocksdb::Status s;
std::string output;
for (auto &user_key : keys_) {
std::vector<MemberScore> member_scores;
s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
Expand All @@ -573,18 +501,20 @@ class CommandBZMPop : public Commander,
if (member_scores.empty()) {
continue;
}
std::string output;
output.append(redis::MultiLen(2));
output.append(redis::BulkString(user_key));
output.append(redis::MultiLen(member_scores.size() * 2));
for (const auto &ms : member_scores) {
output.append(redis::BulkString(ms.member));
output.append(redis::BulkString(util::Float2String(ms.score)));
}
conn_->Reply(output);
reply_flag_ = true;
break;
}
if (output.empty() && !block_) {
output = redis::NilString();
}
conn_->Reply(output);
return s;
}

Expand Down Expand Up @@ -642,6 +572,7 @@ class CommandBZMPop : public Commander,
std::vector<std::string> keys_;
enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE;
int count_ = 1;
bool block_;
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
UniqueEvent timer_;
Expand All @@ -654,6 +585,16 @@ class CommandBZMPop : public Commander,
}
};

class CommandZMPop : public CommandMPop {
public:
CommandZMPop() : CommandMPop(false) {}
};

class CommandBZMPop : public CommandMPop {
public:
CommandBZMPop() : CommandMPop(true) {}
};

class CommandZRangeStore : public Commander {
public:
explicit CommandZRangeStore() : range_type_(kZRangeRank), direction_(kZRangeDirectionForward) {}
Expand Down

0 comments on commit e85da52

Please sign in to comment.