Skip to content

Commit

Permalink
Merge pull request #1 from torwig/review_bzmpop
Browse files Browse the repository at this point in the history
Refactor the code related to BZMPOP
  • Loading branch information
Yangsx-1 authored Jun 17, 2023
2 parents 6805353 + a3acc7a commit 0147279
Showing 1 changed file with 73 additions and 56 deletions.
129 changes: 73 additions & 56 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,23 +275,22 @@ class CommandZPopMax : public CommandZPop {
CommandZPopMax() : CommandZPop(false) {}
};

static rocksdb::Status PopFromMultipleZsets(std::vector<MemberScore> &member_scores, std::string &userkey, Server *svr,
Connection *conn, const std::vector<std::string> &keys, bool min,
int count) {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
static rocksdb::Status PopFromMultipleZsets(redis::ZSet *zset_db, const std::vector<std::string> &keys, bool min,
int count, std::string *user_key, std::vector<MemberScore> *member_scores) {
rocksdb::Status s;
for (auto &user_key : keys) {
s = zset_db.Pop(user_key, count, min, &member_scores);
for (auto &key : keys) {
s = zset_db->Pop(key, count, min, member_scores);
if (!s.ok()) {
break;
return s;
}
if (member_scores.empty()) {
continue;

if (!member_scores->empty()) {
*user_key = key;
break;
}
userkey = user_key;
break;
}
return s;

return rocksdb::Status::OK();
}

class CommandBZPop : public Commander,
Expand All @@ -318,21 +317,24 @@ class CommandBZPop : public Commander,
svr_ = svr;
conn_ = conn;

std::string userkey;
std::string user_key;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey, svr, conn, keys_, min_, 1);

redis::ZSet zset_db(svr->storage, conn->GetNamespace());
auto s = PopFromMultipleZsets(&zset_db, keys_, min_, 1, &user_key, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (!member_scores.empty()) {
SendMembersWithScores(member_scores, userkey);
SendMembersWithScores(member_scores, user_key);
return Status::OK();
}

// All Empty
// all sorted sets are empty
if (conn->IsInExec()) {
*output = redis::MultiLen(-1);
return Status::OK(); // No blocking in multi-exec
return Status::OK(); // no blocking in multi-exec
}

for (const auto &key : keys_) {
Expand All @@ -351,10 +353,10 @@ class CommandBZPop : public Commander,
return {Status::BlockingCmd};
}

void SendMembersWithScores(const std::vector<MemberScore> &member_scores, const std::string &userkey) {
void SendMembersWithScores(const std::vector<MemberScore> &member_scores, const std::string &user_key) {
std::string output;
output.append(redis::MultiLen(member_scores.size() * 2 + 1));
output.append(redis::BulkString(userkey));
output.append(redis::BulkString(user_key));
for (const auto &ms : member_scores) {
output.append(redis::BulkString(ms.member));
output.append(redis::BulkString(util::Float2String(ms.score)));
Expand All @@ -363,28 +365,32 @@ class CommandBZPop : public Commander,
}

void OnWrite(bufferevent *bev) {
std::string userkey;
std::string user_key;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey, svr_, conn_, keys_, min_, 1);

redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
auto s = PopFromMultipleZsets(&zset_db, keys_, min_, 1, &user_key, &member_scores);
if (!s.ok()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
return;
}

if (member_scores.empty()) {
// The connection may be waked up but can't pop from list. For example,
// connection A is blocking on list and connection B push a new element
// then wake up the connection A, but this element may be token by other connection C.
// So we need to wait for the wake event again by disabling the WRITE event.
// The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and
// connection B added a new element; then connection A was unblocked, but this element may be taken by
// another connection C. So we need to block connection A again and wait for the element being added
// by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
}
SendMembersWithScores(member_scores, userkey);

SendMembersWithScores(member_scores, user_key);

if (timer_) {
timer_.reset();
}

unBlockingAll();
unblockOnAllKeys();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop processing commands
Expand All @@ -398,15 +404,15 @@ class CommandBZPop : public Commander,
if (timer_ != nullptr) {
timer_.reset();
}
unBlockingAll();
unblockOnAllKeys();
}
conn_->OnEvent(bev, events);
}

void TimerCB(int, int16_t events) {
conn_->Reply(redis::NilString());
void TimerCB(int, int16_t) {
conn_->Reply(redis::MultiLen(-1));
timer_.reset();
unBlockingAll();
unblockOnAllKeys();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
Expand All @@ -420,7 +426,7 @@ class CommandBZPop : public Commander,
Connection *conn_ = nullptr;
UniqueEvent timer_;

void unBlockingAll() {
void unblockOnAllKeys() {
for (const auto &key : keys_) {
svr_->UnblockOnKey(key, conn_);
}
Expand All @@ -437,11 +443,11 @@ class CommandBZPopMax : public CommandBZPop {
CommandBZPopMax() : CommandBZPop(false) {}
};

static void SendMembersWithScoresForZMpop(const std::vector<MemberScore> &member_scores, const std::string &userkey,
Connection *conn) {
static void SendMembersWithScoresForZMpop(Connection *conn, const std::string &user_key,
const std::vector<MemberScore> &member_scores) {
std::string output;
output.append(redis::MultiLen(2));
output.append(redis::BulkString(userkey));
output.append(redis::BulkString(user_key));
output.append(redis::MultiLen(member_scores.size() * 2));
for (const auto &ms : member_scores) {
output.append(redis::BulkString(ms.member));
Expand Down Expand Up @@ -490,10 +496,10 @@ class CommandZMPop : public Commander {
continue;
}

SendMembersWithScoresForZMpop(member_scores, user_key, conn);
SendMembersWithScoresForZMpop(conn, user_key, member_scores);
return Status::OK();
}
*output = redis::NilString();
*output = redis::MultiLen(-1);
return Status::OK();
}

Expand All @@ -515,12 +521,14 @@ class CommandBZMPop : public Commander,
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);

timeout_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{0, std::numeric_limits<int>::max()}));
if (timeout_ < 0) {
return {Status::RedisParseErr, errTimeoutIsNegative};
}
numkeys_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{1, std::numeric_limits<int>::max()}));
for (int i = 0; i < numkeys_; ++i) {

num_keys_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{1, std::numeric_limits<int>::max()}));
for (int i = 0; i < num_keys_; ++i) {
keys_.emplace_back(GET_OR_RET(parser.TakeStr()));
}

Expand All @@ -535,31 +543,36 @@ class CommandBZMPop : public Commander,
return parser.InvalidSyntax();
}
}

if (flag_ == ZSET_NONE) {
return parser.InvalidSyntax();
}

return Commander::Parse(args);
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
conn_ = conn;

std::string userkey;
std::string user_key;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey, svr, conn, keys_, flag_ == ZSET_MIN, count_);

redis::ZSet zset_db(svr->storage, conn->GetNamespace());
auto s = PopFromMultipleZsets(&zset_db, keys_, flag_ == ZSET_MIN, count_, &user_key, &member_scores);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

if (!member_scores.empty()) {
SendMembersWithScoresForZMpop(member_scores, userkey, conn_);
SendMembersWithScoresForZMpop(conn_, user_key, member_scores);
return Status::OK();
}

// All Empty
// all sorted sets are empty
if (conn->IsInExec()) {
*output = redis::MultiLen(-1);
return Status::OK(); // No blocking in multi-exec
return Status::OK(); // no blocking in multi-exec
}

for (const auto &key : keys_) {
Expand All @@ -579,28 +592,32 @@ class CommandBZMPop : public Commander,
}

void OnWrite(bufferevent *bev) {
std::string userkey;
std::string user_key;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey, svr_, conn_, keys_, flag_ == ZSET_MIN, count_);

redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
auto s = PopFromMultipleZsets(&zset_db, keys_, flag_ == ZSET_MIN, count_, &user_key, &member_scores);
if (!s.ok()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
return;
}

if (member_scores.empty()) {
// The connection may be waked up but can't pop from list. For example,
// connection A is blocking on list and connection B push a new element
// then wake up the connection A, but this element may be token by other connection C.
// So we need to wait for the wake event again by disabling the WRITE event.
// The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and
// connection B added a new element; then connection A was unblocked, but this element may be taken by
// another connection C. So we need to block connection A again and wait for the element being added
// by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
}
SendMembersWithScoresForZMpop(member_scores, userkey, conn_);

SendMembersWithScoresForZMpop(conn_, user_key, member_scores);

if (timer_) {
timer_.reset();
}

unBlockingAll();
unblockOnAllKeys();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop processing commands
Expand All @@ -614,15 +631,15 @@ class CommandBZMPop : public Commander,
if (timer_ != nullptr) {
timer_.reset();
}
unBlockingAll();
unblockOnAllKeys();
}
conn_->OnEvent(bev, events);
}

void TimerCB(int, int16_t events) {
conn_->Reply(redis::NilString());
timer_.reset();
unBlockingAll();
unblockOnAllKeys();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
Expand All @@ -635,15 +652,15 @@ class CommandBZMPop : public Commander,

private:
int timeout_ = 0; // seconds
int numkeys_;
int num_keys_;
std::vector<std::string> keys_;
enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE;
int count_ = 1;
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
UniqueEvent timer_;

void unBlockingAll() {
void unblockOnAllKeys() {
for (const auto &key : keys_) {
svr_->UnblockOnKey(key, conn_);
}
Expand Down

0 comments on commit 0147279

Please sign in to comment.