Skip to content

Commit

Permalink
Add the support of the BZMPOP command (#1490)
Browse files Browse the repository at this point in the history
This PR adds support for bzmpop, bzpopmax, bzpopmin commands like Redis.
And I also add some tests for these three commands.
  • Loading branch information
Yangsx-1 authored Jun 18, 2023
1 parent 1dc5e9b commit 7016bb6
Show file tree
Hide file tree
Showing 2 changed files with 431 additions and 10 deletions.
348 changes: 340 additions & 8 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class CommandZAdd : public Commander {
return {Status::RedisExecErr, s.ToString()};
}

svr->WakeupBlockingConns(args_[1], member_scores_.size());

if (flags_.HasIncr()) {
auto new_score = member_scores_[0].score;
if ((flags_.HasNX() || flags_.HasXX() || flags_.HasLT() || flags_.HasGT()) && old_score == new_score &&
Expand Down Expand Up @@ -273,6 +275,187 @@ class CommandZPopMax : public CommandZPop {
CommandZPopMax() : CommandZPop(false) {}
};

static rocksdb::Status PopFromMultipleZsets(redis::ZSet *zset_db, const std::vector<std::string> &keys, bool min,
int count, std::string *user_key, std::vector<MemberScore> *member_scores) {
rocksdb::Status s;
for (auto &key : keys) {
s = zset_db->Pop(key, count, min, member_scores);
if (!s.ok()) {
return s;
}

if (!member_scores->empty()) {
*user_key = key;
break;
}
}

return rocksdb::Status::OK();
}

class CommandBZPop : public Commander,
private EvbufCallbackBase<CommandBZPop, false>,
private EventCallbackBase<CommandBZPop> {
public:
explicit CommandBZPop(bool min) : min_(min) {}

Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
if (!parse_result) {
return {Status::RedisParseErr, "timeout is not an integer or out of range"};
}
if (*parse_result < 0) {
return {Status::RedisParseErr, errTimeoutIsNegative};
}
timeout_ = *parse_result;

keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
conn_ = conn;

std::string user_key;
std::vector<MemberScore> member_scores;

redis::ZSet zset_db(svr->storage, conn->GetNamespace());
auto s = PopFromMultipleZsets(&zset_db, keys_, min_, 1, &user_key, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (!member_scores.empty()) {
SendMembersWithScores(member_scores, user_key);
return Status::OK();
}

// all sorted sets are empty
if (conn->IsInExec()) {
*output = redis::MultiLen(-1);
return Status::OK(); // no blocking in multi-exec
}

for (const auto &key : keys_) {
svr_->BlockOnKey(key, conn_);
}

auto bev = conn->GetBufferEvent();
SetCB(bev);

if (timeout_) {
timer_.reset(NewTimer(bufferevent_get_base(bev)));
timeval tm = {timeout_, 0};
evtimer_add(timer_.get(), &tm);
}

return {Status::BlockingCmd};
}

void SendMembersWithScores(const std::vector<MemberScore> &member_scores, const std::string &user_key) {
std::string output;
output.append(redis::MultiLen(member_scores.size() * 2 + 1));
output.append(redis::BulkString(user_key));
for (const auto &ms : member_scores) {
output.append(redis::BulkString(ms.member));
output.append(redis::BulkString(util::Float2String(ms.score)));
}
conn_->Reply(output);
}

void OnWrite(bufferevent *bev) {
std::string user_key;
std::vector<MemberScore> member_scores;

redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
auto s = PopFromMultipleZsets(&zset_db, keys_, min_, 1, &user_key, &member_scores);
if (!s.ok()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
return;
}

if (member_scores.empty()) {
// The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and
// connection B added a new element; then connection A was unblocked, but this element may be taken by
// another connection C. So we need to block connection A again and wait for the element being added
// by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
}

SendMembersWithScores(member_scores, user_key);

if (timer_) {
timer_.reset();
}

unblockOnAllKeys();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop processing commands
// in connection after the blocking command, so there may have some commands to be processed.
// Related issue: https://github.com/apache/incubator-kvrocks/issues/831
bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}

void OnEvent(bufferevent *bev, int16_t events) {
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
if (timer_ != nullptr) {
timer_.reset();
}
unblockOnAllKeys();
}
conn_->OnEvent(bev, events);
}

void TimerCB(int, int16_t) {
conn_->Reply(redis::MultiLen(-1));
timer_.reset();
unblockOnAllKeys();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}

private:
bool min_;
int timeout_;
std::vector<std::string> keys_;
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
UniqueEvent timer_;

void unblockOnAllKeys() {
for (const auto &key : keys_) {
svr_->UnblockOnKey(key, conn_);
}
}
};

class CommandBZPopMin : public CommandBZPop {
public:
CommandBZPopMin() : CommandBZPop(true) {}
};

class CommandBZPopMax : public CommandBZPop {
public:
CommandBZPopMax() : CommandBZPop(false) {}
};

static void SendMembersWithScoresForZMpop(Connection *conn, const std::string &user_key,
const std::vector<MemberScore> &member_scores) {
std::string output;
output.append(redis::MultiLen(2));
output.append(redis::BulkString(user_key));
output.append(redis::MultiLen(member_scores.size() * 2));
for (const auto &ms : member_scores) {
output.append(redis::BulkString(ms.member));
output.append(redis::BulkString(util::Float2String(ms.score)));
}
conn->Reply(output);
}

class CommandZMPop : public Commander {
public:
CommandZMPop() = default;
Expand Down Expand Up @@ -313,16 +496,10 @@ class CommandZMPop : public Commander {
continue;
}

output->append(redis::MultiLen(2));
output->append(redis::BulkString(user_key));
output->append(redis::MultiLen(member_scores.size() * 2));
for (const auto &ms : member_scores) {
output->append(redis::BulkString(ms.member));
output->append(redis::BulkString(util::Float2String(ms.score)));
}
SendMembersWithScoresForZMpop(conn, user_key, member_scores);
return Status::OK();
}
*output = redis::NilString();
*output = redis::MultiLen(-1);
return Status::OK();
}

Expand All @@ -338,6 +515,158 @@ class CommandZMPop : public Commander {
int count_ = 1;
};

class CommandBZMPop : public Commander,
private EvbufCallbackBase<CommandBZMPop, false>,
private EventCallbackBase<CommandBZMPop> {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);

timeout_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{0, std::numeric_limits<int>::max()}));
if (timeout_ < 0) {
return {Status::RedisParseErr, errTimeoutIsNegative};
}

num_keys_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{1, std::numeric_limits<int>::max()}));
for (int i = 0; i < num_keys_; ++i) {
keys_.emplace_back(GET_OR_RET(parser.TakeStr()));
}

while (parser.Good()) {
if (parser.EatEqICase("min")) {
flag_ = ZSET_MIN;
} else if (parser.EatEqICase("max")) {
flag_ = ZSET_MAX;
} else if (parser.EatEqICase("count")) {
count_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{1, std::numeric_limits<int>::max()}));
} else {
return parser.InvalidSyntax();
}
}

if (flag_ == ZSET_NONE) {
return parser.InvalidSyntax();
}

return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
conn_ = conn;

std::string user_key;
std::vector<MemberScore> member_scores;

redis::ZSet zset_db(svr->storage, conn->GetNamespace());
auto s = PopFromMultipleZsets(&zset_db, keys_, flag_ == ZSET_MIN, count_, &user_key, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (!member_scores.empty()) {
SendMembersWithScoresForZMpop(conn_, user_key, member_scores);
return Status::OK();
}

// all sorted sets are empty
if (conn->IsInExec()) {
*output = redis::MultiLen(-1);
return Status::OK(); // no blocking in multi-exec
}

for (const auto &key : keys_) {
svr_->BlockOnKey(key, conn_);
}

auto bev = conn->GetBufferEvent();
SetCB(bev);

if (timeout_) {
timer_.reset(NewTimer(bufferevent_get_base(bev)));
timeval tm = {timeout_, 0};
evtimer_add(timer_.get(), &tm);
}

return {Status::BlockingCmd};
}

void OnWrite(bufferevent *bev) {
std::string user_key;
std::vector<MemberScore> member_scores;

redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
auto s = PopFromMultipleZsets(&zset_db, keys_, flag_ == ZSET_MIN, count_, &user_key, &member_scores);
if (!s.ok()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
return;
}

if (member_scores.empty()) {
// The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and
// connection B added a new element; then connection A was unblocked, but this element may be taken by
// another connection C. So we need to block connection A again and wait for the element being added
// by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
}

SendMembersWithScoresForZMpop(conn_, user_key, member_scores);

if (timer_) {
timer_.reset();
}

unblockOnAllKeys();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop processing commands
// in connection after the blocking command, so there may have some commands to be processed.
// Related issue: https://github.com/apache/incubator-kvrocks/issues/831
bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}

void OnEvent(bufferevent *bev, int16_t events) {
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
if (timer_ != nullptr) {
timer_.reset();
}
unblockOnAllKeys();
}
conn_->OnEvent(bev, events);
}

void TimerCB(int, int16_t events) {
conn_->Reply(redis::NilString());
timer_.reset();
unblockOnAllKeys();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}

static CommandKeyRange Range(const std::vector<std::string> &args) {
int num_key = *ParseInt<int>(args[2], 10);
return {3, 1 + num_key, 1};
}

private:
int timeout_ = 0; // seconds
int num_keys_;
std::vector<std::string> keys_;
enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE;
int count_ = 1;
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
UniqueEvent timer_;

void unblockOnAllKeys() {
for (const auto &key : keys_) {
svr_->UnblockOnKey(key, conn_);
}
}
};

class CommandZRangeStore : public Commander {
public:
explicit CommandZRangeStore() : range_type_(kZRangeRank), direction_(kZRangeDirectionForward) {}
Expand Down Expand Up @@ -936,7 +1265,10 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandZAdd>("zadd", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandZLexCount>("zlexcount", 4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandZPopMax>("zpopmax", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandZPopMin>("zpopmin", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandBZPopMax>("bzpopmax", -3, "write", 1, -2, 1),
MakeCmdAttr<CommandBZPopMin>("bzpopmin", -3, "write", 1, -2, 1),
MakeCmdAttr<CommandZMPop>("zmpop", -4, "write", CommandZMPop::Range),
MakeCmdAttr<CommandBZMPop>("bzmpop", -5, "write", CommandBZMPop::Range),
MakeCmdAttr<CommandZRangeStore>("zrangestore", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandZRange>("zrange", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandZRevRange>("zrevrange", -4, "read-only", 1, 1, 1),
Expand Down
Loading

0 comments on commit 7016bb6

Please sign in to comment.