diff --git a/src/commands/blocking_commander.h b/src/commands/blocking_commander.h new file mode 100644 index 00000000000..537e770fd1b --- /dev/null +++ b/src/commands/blocking_commander.h @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include "commander.h" +#include "event_util.h" +#include "server/redis_connection.h" + +namespace redis { + +class BlockingCommander : public Commander, + private EvbufCallbackBase, + private EventCallbackBase { + public: + // method to reply when no operation happens + virtual std::string NoopReply() = 0; + + // method to block keys + virtual void BlockKeys() = 0; + + // method to unblock keys + virtual void UnblockKeys() = 0; + + // method to access database in write callback + // the return value indicates if the real database operation happens + // in other words, returning true indicates ending the blocking + virtual bool OnBlockingWrite() = 0; + + // to start the blocking process + // usually put to the end of the Execute method + Status StartBlocking(int64_t timeout, std::string *output) { + if (conn_->IsInExec()) { + *output = NoopReply(); + return Status::OK(); // no blocking in multi-exec + } + + BlockKeys(); + SetCB(conn_->GetBufferEvent()); + + if (timeout) { + InitTimer(timeout); + } + + return {Status::BlockingCmd}; + } + + void OnWrite(bufferevent *bev) { + bool done = OnBlockingWrite(); + + if (!done) { + // The connection may be waked up but can't pop from the datatype. + // For example, connection A is blocked on it and connection B added a new element; + // then connection A was unblocked, but this element may be taken by + // another connection C. So we need to block connection A again + // and wait for the element being added by disabling the WRITE event. + bufferevent_disable(bev, EV_WRITE); + return; + } + + if (timer_) { + timer_.reset(); + } + + UnblockKeys(); + conn_->SetCB(bev); + bufferevent_enable(bev, EV_READ); + // We need to manually trigger the read event since we will stop processing commands + // in connection after the blocking command, so there may have some commands to be processed. + // Related issue: https://github.com/apache/kvrocks/issues/831 + bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); + } + + void OnEvent(bufferevent *bev, int16_t events) { + if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { + if (timer_ != nullptr) { + timer_.reset(); + } + UnblockKeys(); + } + conn_->OnEvent(bev, events); + } + + // Usually put to the top of the Execute method + void InitConnection(Connection *conn) { conn_ = conn; } + + void InitTimer(int64_t timeout) { + auto bev = conn_->GetBufferEvent(); + timer_.reset(NewTimer(bufferevent_get_base(bev))); + int64_t timeout_second = timeout / 1000 / 1000; + int64_t timeout_microsecond = timeout % (1000 * 1000); + timeval tm = {timeout_second, static_cast(timeout_microsecond)}; + evtimer_add(timer_.get(), &tm); + } + + void TimerCB(int, int16_t) { + conn_->Reply(NoopReply()); + timer_.reset(); + UnblockKeys(); + auto bev = conn_->GetBufferEvent(); + conn_->SetCB(bev); + bufferevent_enable(bev, EV_READ); + } + + protected: + Connection *conn_ = nullptr; + UniqueEvent timer_; +}; + +} // namespace redis diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc index 82e3e09188d..ad5a50dbd41 100644 --- a/src/commands/cmd_list.cc +++ b/src/commands/cmd_list.cc @@ -19,9 +19,11 @@ */ #include "commander.h" +#include "commands/blocking_commander.h" #include "commands/command_parser.h" #include "error_constants.h" #include "event_util.h" +#include "server/redis_reply.h" #include "server/server.h" #include "types/redis_list.h" @@ -232,9 +234,7 @@ class CommandLMPop : public Commander { std::vector keys_; }; -class CommandBPop : public Commander, - private EvbufCallbackBase, - private EventCallbackBase { +class CommandBPop : public BlockingCommander { public: explicit CommandBPop(bool left) : left_(left) {} @@ -261,34 +261,26 @@ class CommandBPop : public Commander, Status Execute(Server *svr, Connection *conn, std::string *output) override { svr_ = svr; - conn_ = conn; + InitConnection(conn); - auto bev = conn->GetBufferEvent(); auto s = TryPopFromList(); if (s.ok() || !s.IsNotFound()) { return Status::OK(); // error has already output in TryPopFromList } - if (conn->IsInExec()) { - *output = redis::MultiLen(-1); - return Status::OK(); // No blocking in multi-exec - } + return StartBlocking(timeout_, output); + } + void BlockKeys() override { for (const auto &key : keys_) { svr_->BlockOnKey(key, conn_); } + } - SetCB(bev); - - if (timeout_) { - timer_.reset(NewTimer(bufferevent_get_base(bev))); - int64_t timeout_second = timeout_ / 1000 / 1000; - int64_t timeout_microsecond = timeout_ % (1000 * 1000); - timeval tm = {timeout_second, static_cast(timeout_microsecond)}; - evtimer_add(timer_.get(), &tm); + void UnblockKeys() override { + for (const auto &key : keys_) { + svr_->UnblockOnKey(key, conn_); } - - return {Status::BlockingCmd}; } rocksdb::Status TryPopFromList() { @@ -318,62 +310,18 @@ class CommandBPop : public Commander, return s; } - void OnWrite(bufferevent *bev) { + bool OnBlockingWrite() override { auto s = TryPopFromList(); - if (s.IsNotFound()) { - // The connection may be waked up but can't pop from list. For example, - // connection A is blocking on list and connection B push a new element - // then wake up the connection A, but this element may be token by other connection C. - // So we need to wait for the wake event again by disabling the WRITE event. - bufferevent_disable(bev, EV_WRITE); - return; - } - - if (timer_) { - timer_.reset(); - } - - unBlockingAll(); - conn_->SetCB(bev); - bufferevent_enable(bev, EV_READ); - // We need to manually trigger the read event since we will stop processing commands - // in connection after the blocking command, so there may have some commands to be processed. - // Related issue: https://github.com/apache/kvrocks/issues/831 - bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); + return !s.IsNotFound(); } - void OnEvent(bufferevent *bev, int16_t events) { - if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { - if (timer_ != nullptr) { - timer_.reset(); - } - unBlockingAll(); - } - conn_->OnEvent(bev, events); - } - - void TimerCB(int, int16_t events) { - conn_->Reply(redis::NilString()); - timer_.reset(); - unBlockingAll(); - auto bev = conn_->GetBufferEvent(); - conn_->SetCB(bev); - bufferevent_enable(bev, EV_READ); - } + std::string NoopReply() override { return redis::NilString(); } private: bool left_ = false; int64_t timeout_ = 0; // microseconds std::vector keys_; Server *svr_ = nullptr; - Connection *conn_ = nullptr; - UniqueEvent timer_; - - void unBlockingAll() { - for (const auto &key : keys_) { - svr_->UnblockOnKey(key, conn_); - } - } }; class CommandBLPop : public CommandBPop { @@ -632,9 +580,7 @@ class CommandLMove : public Commander { bool dst_left_; }; -class CommandBLMove : public Commander, - private EvbufCallbackBase, - private EventCallbackBase { +class CommandBLMove : public BlockingCommander { public: Status Parse(const std::vector &args) override { auto arg_val = util::ToLower(args_[3]); @@ -663,7 +609,7 @@ class CommandBLMove : public Commander, Status Execute(Server *svr, Connection *conn, std::string *output) override { svr_ = svr; - conn_ = conn; + InitConnection(conn); redis::List list_db(svr->storage, conn->GetNamespace()); std::string elem; @@ -676,87 +622,37 @@ class CommandBLMove : public Commander, return Status::OK(); } - if (conn->IsInExec()) { - *output = redis::MultiLen(-1); - return Status::OK(); // no blocking in multi-exec - } - - svr_->BlockOnKey(args_[1], conn_); - auto bev = conn->GetBufferEvent(); - SetCB(bev); + return StartBlocking(timeout_, output); + } - if (timeout_) { - timer_.reset(NewTimer(bufferevent_get_base(bev))); - int64_t timeout_second = timeout_ / 1000 / 1000; - int64_t timeout_microsecond = timeout_ % (1000 * 1000); - timeval tm = {timeout_second, static_cast(timeout_microsecond)}; - evtimer_add(timer_.get(), &tm); - } + void BlockKeys() override { svr_->BlockOnKey(args_[1], conn_); } - return {Status::BlockingCmd}; - } + void UnblockKeys() override { svr_->UnblockOnKey(args_[1], conn_); } - void OnWrite(bufferevent *bev) { + bool OnBlockingWrite() override { redis::List list_db(svr_->storage, conn_->GetNamespace()); std::string elem; auto s = list_db.LMove(args_[1], args_[2], src_left_, dst_left_, &elem); if (!s.ok() && !s.IsNotFound()) { conn_->Reply(redis::Error("ERR " + s.ToString())); - return; + return true; } - if (elem.empty()) { - // The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and - // connection B added a new element; then connection A was unblocked, but this element may be taken by - // another connection C. So we need to block connection A again and wait for the element being added - // by disabling the WRITE event. - bufferevent_disable(bev, EV_WRITE); - return; + bool empty = elem.empty(); + if (!empty) { + conn_->Reply(redis::BulkString(elem)); } - conn_->Reply(redis::BulkString(elem)); - - if (timer_) { - timer_.reset(); - } - - unblockOnSrc(); - conn_->SetCB(bev); - bufferevent_enable(bev, EV_READ); - // We need to manually trigger the read event since we will stop processing commands - // in connection after the blocking command, so there may have some commands to be processed. - // Related issue: https://github.com/apache/kvrocks/issues/831 - bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); + return !empty; } - void OnEvent(bufferevent *bev, int16_t events) { - if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { - if (timer_ != nullptr) { - timer_.reset(); - } - unblockOnSrc(); - } - conn_->OnEvent(bev, events); - } - - void TimerCB(int, int16_t) { - conn_->Reply(redis::MultiLen(-1)); - timer_.reset(); - unblockOnSrc(); - auto bev = conn_->GetBufferEvent(); - conn_->SetCB(bev); - bufferevent_enable(bev, EV_READ); - } + std::string NoopReply() override { return redis::MultiLen(-1); } private: bool src_left_; bool dst_left_; int64_t timeout_ = 0; // microseconds Server *svr_ = nullptr; - Connection *conn_ = nullptr; - UniqueEvent timer_; - - void unblockOnSrc() { svr_->UnblockOnKey(args_[1], conn_); } }; class CommandLPos : public Commander { diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc index 52c9a3d4a05..41493b6e153 100644 --- a/src/commands/cmd_zset.cc +++ b/src/commands/cmd_zset.cc @@ -22,6 +22,7 @@ #include "command_parser.h" #include "commander.h" +#include "commands/blocking_commander.h" #include "commands/scan_base.h" #include "error_constants.h" #include "server/redis_reply.h" @@ -293,9 +294,7 @@ static rocksdb::Status PopFromMultipleZsets(redis::ZSet *zset_db, const std::vec return rocksdb::Status::OK(); } -class CommandBZPop : public Commander, - private EvbufCallbackBase, - private EventCallbackBase { +class CommandBZPop : public BlockingCommander { public: explicit CommandBZPop(bool min) : min_(min) {} @@ -315,7 +314,7 @@ class CommandBZPop : public Commander, Status Execute(Server *svr, Connection *conn, std::string *output) override { svr_ = svr; - conn_ = conn; + InitConnection(conn); std::string user_key; std::vector member_scores; @@ -331,28 +330,21 @@ class CommandBZPop : public Commander, return Status::OK(); } - // all sorted sets are empty - if (conn->IsInExec()) { - *output = redis::MultiLen(-1); - return Status::OK(); // no blocking in multi-exec - } + return StartBlocking(timeout_, output); + } + + std::string NoopReply() override { return redis::MultiLen(-1); } + void BlockKeys() override { for (const auto &key : keys_) { svr_->BlockOnKey(key, conn_); } + } - auto bev = conn->GetBufferEvent(); - SetCB(bev); - - if (timeout_) { - timer_.reset(NewTimer(bufferevent_get_base(bev))); - int64_t timeout_second = timeout_ / 1000 / 1000; - int64_t timeout_microsecond = timeout_ % (1000 * 1000); - timeval tm = {timeout_second, static_cast(timeout_microsecond)}; - evtimer_add(timer_.get(), &tm); + void UnblockKeys() override { + for (const auto &key : keys_) { + svr_->UnblockOnKey(key, conn_); } - - return {Status::BlockingCmd}; } void SendMembersWithScores(const std::vector &member_scores, const std::string &user_key) { @@ -366,7 +358,7 @@ class CommandBZPop : public Commander, conn_->Reply(output); } - void OnWrite(bufferevent *bev) { + bool OnBlockingWrite() override { std::string user_key; std::vector member_scores; @@ -374,50 +366,15 @@ class CommandBZPop : public Commander, auto s = PopFromMultipleZsets(&zset_db, keys_, min_, 1, &user_key, &member_scores); if (!s.ok()) { conn_->Reply(redis::Error("ERR " + s.ToString())); - return; - } - - if (member_scores.empty()) { - // The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and - // connection B added a new element; then connection A was unblocked, but this element may be taken by - // another connection C. So we need to block connection A again and wait for the element being added - // by disabling the WRITE event. - bufferevent_disable(bev, EV_WRITE); - return; + return true; } - SendMembersWithScores(member_scores, user_key); - - if (timer_) { - timer_.reset(); - } - - unblockOnAllKeys(); - conn_->SetCB(bev); - bufferevent_enable(bev, EV_READ); - // We need to manually trigger the read event since we will stop processing commands - // in connection after the blocking command, so there may have some commands to be processed. - // Related issue: https://github.com/apache/kvrocks/issues/831 - bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); - } - - void OnEvent(bufferevent *bev, int16_t events) { - if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { - if (timer_ != nullptr) { - timer_.reset(); - } - unblockOnAllKeys(); + bool empty = member_scores.empty(); + if (!empty) { + SendMembersWithScores(member_scores, user_key); } - conn_->OnEvent(bev, events); - } - void TimerCB(int, int16_t) { - conn_->Reply(redis::MultiLen(-1)); - timer_.reset(); - unblockOnAllKeys(); - auto bev = conn_->GetBufferEvent(); - conn_->SetCB(bev); - bufferevent_enable(bev, EV_READ); + return !empty; } private: @@ -425,14 +382,6 @@ class CommandBZPop : public Commander, int64_t timeout_ = 0; // microseconds std::vector keys_; Server *svr_ = nullptr; - Connection *conn_ = nullptr; - UniqueEvent timer_; - - void unblockOnAllKeys() { - for (const auto &key : keys_) { - svr_->UnblockOnKey(key, conn_); - } - } }; class CommandBZPopMin : public CommandBZPop { @@ -518,9 +467,7 @@ class CommandZMPop : public Commander { int count_ = 0; }; -class CommandBZMPop : public Commander, - private EvbufCallbackBase, - private EventCallbackBase { +class CommandBZMPop : public BlockingCommander { public: Status Parse(const std::vector &args) override { CommandParser parser(args, 1); @@ -557,7 +504,7 @@ class CommandBZMPop : public Commander, Status Execute(Server *svr, Connection *conn, std::string *output) override { svr_ = svr; - conn_ = conn; + InitConnection(conn); std::string user_key; std::vector member_scores; @@ -573,31 +520,24 @@ class CommandBZMPop : public Commander, return Status::OK(); } - // all sorted sets are empty - if (conn->IsInExec()) { - *output = redis::MultiLen(-1); - return Status::OK(); // no blocking in multi-exec - } + return StartBlocking(timeout_, output); + } + void BlockKeys() override { for (const auto &key : keys_) { svr_->BlockOnKey(key, conn_); } + } - auto bev = conn->GetBufferEvent(); - SetCB(bev); - - if (timeout_) { - timer_.reset(NewTimer(bufferevent_get_base(bev))); - int64_t timeout_second = timeout_ / 1000 / 1000; - int64_t timeout_microsecond = timeout_ % (1000 * 1000); - timeval tm = {timeout_second, static_cast(timeout_microsecond)}; - evtimer_add(timer_.get(), &tm); + void UnblockKeys() override { + for (const auto &key : keys_) { + svr_->UnblockOnKey(key, conn_); } - - return {Status::BlockingCmd}; } - void OnWrite(bufferevent *bev) { + std::string NoopReply() override { return redis::NilString(); } + + bool OnBlockingWrite() override { std::string user_key; std::vector member_scores; @@ -605,50 +545,15 @@ class CommandBZMPop : public Commander, auto s = PopFromMultipleZsets(&zset_db, keys_, flag_ == ZSET_MIN, count_, &user_key, &member_scores); if (!s.ok()) { conn_->Reply(redis::Error("ERR " + s.ToString())); - return; - } - - if (member_scores.empty()) { - // The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and - // connection B added a new element; then connection A was unblocked, but this element may be taken by - // another connection C. So we need to block connection A again and wait for the element being added - // by disabling the WRITE event. - bufferevent_disable(bev, EV_WRITE); - return; + return true; } - SendMembersWithScoresForZMpop(conn_, user_key, member_scores); - - if (timer_) { - timer_.reset(); - } - - unblockOnAllKeys(); - conn_->SetCB(bev); - bufferevent_enable(bev, EV_READ); - // We need to manually trigger the read event since we will stop processing commands - // in connection after the blocking command, so there may have some commands to be processed. - // Related issue: https://github.com/apache/kvrocks/issues/831 - bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); - } - - void OnEvent(bufferevent *bev, int16_t events) { - if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { - if (timer_ != nullptr) { - timer_.reset(); - } - unblockOnAllKeys(); + bool empty = member_scores.empty(); + if (!empty) { + SendMembersWithScoresForZMpop(conn_, user_key, member_scores); } - conn_->OnEvent(bev, events); - } - void TimerCB(int, int16_t events) { - conn_->Reply(redis::NilString()); - timer_.reset(); - unblockOnAllKeys(); - auto bev = conn_->GetBufferEvent(); - conn_->SetCB(bev); - bufferevent_enable(bev, EV_READ); + return !empty; } static CommandKeyRange Range(const std::vector &args) { @@ -663,14 +568,6 @@ class CommandBZMPop : public Commander, enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE; int count_ = 0; Server *svr_ = nullptr; - Connection *conn_ = nullptr; - UniqueEvent timer_; - - void unblockOnAllKeys() { - for (const auto &key : keys_) { - svr_->UnblockOnKey(key, conn_); - } - } }; class CommandZRangeStore : public Commander {