Skip to content

Commit

Permalink
refactor pop and send func
Browse files Browse the repository at this point in the history
  • Loading branch information
Yangsx-1 committed Jun 16, 2023
1 parent 6a14e88 commit 339c57b
Showing 1 changed file with 50 additions and 48 deletions.
98 changes: 50 additions & 48 deletions src/commands/cmd_zset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -266,22 +266,15 @@ class CommandZPop : public Commander,
svr_ = svr;
conn_ = conn;

auto s = TryPopFromMultiZset();
std::string userkey;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey);
if (!s.ok()) {
return Status::OK(); // Error already output
}

if (!block_) { // ReplyForZPop
output->append(redis::MultiLen(member_scores_.size() * 2));
for (const auto &ms : member_scores_) {
output->append(redis::BulkString(ms.member));
output->append(redis::BulkString(util::Float2String(ms.score)));
}
return Status::OK();
return {Status::RedisExecErr, s.ToString()};
}

if (!member_scores_.empty()) {
ReplyForBZPop();
if (!block_ || !member_scores.empty()) {
SendMembersWithScores(member_scores, userkey);
return Status::OK();
}

Expand All @@ -307,47 +300,54 @@ class CommandZPop : public Commander,
return {Status::BlockingCmd};
}

rocksdb::Status TryPopFromMultiZset() {
rocksdb::Status PopFromMultipleZsets(std::vector<MemberScore> &member_scores, std::string &userkey) {
redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
rocksdb::Status s;
for (auto &user_key : keys_) {
s = zset_db.Pop(user_key, count_, min_, &member_scores_);
s = zset_db.Pop(user_key, count_, min_, &member_scores);
if (!s.ok()) {
// output here is necessary for block operation to reply error
conn_->Reply(redis::Error("ERR " + s.ToString()));
break;
}
if (member_scores_.empty() && block_) {
if (member_scores.empty() && block_) {
continue;
}
user_key_ = user_key;
userkey = user_key;
break;
}
return s;
}

void ReplyForBZPop() {
void SendMembersWithScores(const std::vector<MemberScore> &member_scores, const std::string &userkey) {
std::string output;
output.append(redis::MultiLen(member_scores_.size() * 2 + 1));
output.append(redis::BulkString(user_key_));
for (const auto &ms : member_scores_) {
if (block_)
output.append(redis::MultiLen(member_scores.size() * 2 + 1));
else
output.append(redis::MultiLen(member_scores.size() * 2));
if (block_) output.append(redis::BulkString(userkey));
for (const auto &ms : member_scores) {
output.append(redis::BulkString(ms.member));
output.append(redis::BulkString(util::Float2String(ms.score)));
}
conn_->Reply(output);
}

void OnWrite(bufferevent *bev) {
auto s = TryPopFromMultiZset();
if (member_scores_.empty()) {
std::string userkey;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey);
if (!s.ok()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
return;
}
if (member_scores.empty()) {
// The connection may be waked up but can't pop from list. For example,
// connection A is blocking on list and connection B push a new element
// then wake up the connection A, but this element may be token by other connection C.
// So we need to wait for the wake event again by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
}
ReplyForBZPop();
SendMembersWithScores(member_scores, userkey);

if (timer_) {
timer_.reset();
Expand Down Expand Up @@ -390,8 +390,6 @@ class CommandZPop : public Commander,
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
UniqueEvent timer_;
std::string user_key_;
std::vector<MemberScore> member_scores_;

void unBlockingAll() {
for (const auto &key : keys_) {
Expand Down Expand Up @@ -461,12 +459,14 @@ class CommandMPop : public Commander,
svr_ = svr;
conn_ = conn;

auto s = TryPopFromMultiZset();
std::string userkey;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey);
if (!s.ok()) {
return Status::OK(); // error has already output
return {Status::RedisExecErr, s.ToString()};
}
if (!block_ || !member_scores_.empty()) {
ReplyForMPop();
if (!block_ || !member_scores.empty()) {
SendMembersWithScores(member_scores, userkey);
return Status::OK();
}

Expand All @@ -492,51 +492,55 @@ class CommandMPop : public Commander,
return {Status::BlockingCmd};
}

rocksdb::Status TryPopFromMultiZset() {
rocksdb::Status PopFromMultipleZsets(std::vector<MemberScore> &member_scores, std::string &userkey) {
redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
rocksdb::Status s;
for (auto &user_key : keys_) {
s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores_);
s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
if (!s.ok()) {
// output here is necessary for block operation to reply error
conn_->Reply(redis::Error("ERR " + s.ToString()));
break;
}
if (member_scores_.empty()) {
if (member_scores.empty()) {
continue;
}
user_key_ = user_key;
userkey = user_key;
break;
}
return s;
}

void ReplyForMPop() {
void SendMembersWithScores(const std::vector<MemberScore> &member_scores, const std::string &userkey) {
std::string output;
output.append(redis::MultiLen(2));
output.append(redis::BulkString(user_key_));
output.append(redis::MultiLen(member_scores_.size() * 2));
for (const auto &ms : member_scores_) {
output.append(redis::BulkString(userkey));
output.append(redis::MultiLen(member_scores.size() * 2));
for (const auto &ms : member_scores) {
output.append(redis::BulkString(ms.member));
output.append(redis::BulkString(util::Float2String(ms.score)));
}
if (member_scores_.empty() && !block_) {
if (member_scores.empty() && !block_) {
output = redis::NilString();
}
conn_->Reply(output);
}

void OnWrite(bufferevent *bev) {
auto s = TryPopFromMultiZset();
if (member_scores_.empty()) {
std::string userkey;
std::vector<MemberScore> member_scores;
auto s = PopFromMultipleZsets(member_scores, userkey);
if (!s.ok()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
return;
}
if (member_scores.empty()) {
// The connection may be waked up but can't pop from list. For example,
// connection A is blocking on list and connection B push a new element
// then wake up the connection A, but this element may be token by other connection C.
// So we need to wait for the wake event again by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
}
ReplyForMPop();
SendMembersWithScores(member_scores, userkey);

if (timer_) {
timer_.reset();
Expand Down Expand Up @@ -585,8 +589,6 @@ class CommandMPop : public Commander,
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
UniqueEvent timer_;
std::string user_key_;
std::vector<MemberScore> member_scores_;

void unBlockingAll() {
for (const auto &key : keys_) {
Expand Down

0 comments on commit 339c57b

Please sign in to comment.