From 5c9137c56703528bb164d1cc0f9bbcd5d0079ec1 Mon Sep 17 00:00:00 2001
From: Twice <twice.mliu@gmail.com>
Date: Wed, 13 Sep 2023 14:37:27 +0900
Subject: [PATCH] Add BlockingCommander to refactor all blocking commands
 (#1757)

---
 src/commands/blocking_commander.h | 127 ++++++++++++++++++++++
 src/commands/cmd_list.cc          | 158 +++++----------------------
 src/commands/cmd_zset.cc          | 173 ++++++------------------------
 3 files changed, 189 insertions(+), 269 deletions(-)
 create mode 100644 src/commands/blocking_commander.h

diff --git a/src/commands/blocking_commander.h b/src/commands/blocking_commander.h
new file mode 100644
index 00000000000..537e770fd1b
--- /dev/null
+++ b/src/commands/blocking_commander.h
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "commander.h"
+#include "event_util.h"
+#include "server/redis_connection.h"
+
+namespace redis {
+
+class BlockingCommander : public Commander,
+                          private EvbufCallbackBase<BlockingCommander, false>,
+                          private EventCallbackBase<BlockingCommander> {
+ public:
+  // method to reply when no operation happens
+  virtual std::string NoopReply() = 0;
+
+  // method to block keys
+  virtual void BlockKeys() = 0;
+
+  // method to unblock keys
+  virtual void UnblockKeys() = 0;
+
+  // method to access database in write callback
+  // the return value indicates if the real database operation happens
+  // in other words, returning true indicates ending the blocking
+  virtual bool OnBlockingWrite() = 0;
+
+  // to start the blocking process
+  // usually put to the end of the Execute method
+  Status StartBlocking(int64_t timeout, std::string *output) {
+    if (conn_->IsInExec()) {
+      *output = NoopReply();
+      return Status::OK();  // no blocking in multi-exec
+    }
+
+    BlockKeys();
+    SetCB(conn_->GetBufferEvent());
+
+    if (timeout) {
+      InitTimer(timeout);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  void OnWrite(bufferevent *bev) {
+    bool done = OnBlockingWrite();
+
+    if (!done) {
+      // The connection may be waked up but can't pop from the datatype.
+      // For example, connection A is blocked on it and connection B added a new element;
+      // then connection A was unblocked, but this element may be taken by
+      // another connection C. So we need to block connection A again
+      // and wait for the element being added by disabling the WRITE event.
+      bufferevent_disable(bev, EV_WRITE);
+      return;
+    }
+
+    if (timer_) {
+      timer_.reset();
+    }
+
+    UnblockKeys();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
+    // We need to manually trigger the read event since we will stop processing commands
+    // in connection after the blocking command, so there may have some commands to be processed.
+    // Related issue: https://github.com/apache/kvrocks/issues/831
+    bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+  }
+
+  void OnEvent(bufferevent *bev, int16_t events) {
+    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
+      if (timer_ != nullptr) {
+        timer_.reset();
+      }
+      UnblockKeys();
+    }
+    conn_->OnEvent(bev, events);
+  }
+
+  // Usually put to the top of the Execute method
+  void InitConnection(Connection *conn) { conn_ = conn; }
+
+  void InitTimer(int64_t timeout) {
+    auto bev = conn_->GetBufferEvent();
+    timer_.reset(NewTimer(bufferevent_get_base(bev)));
+    int64_t timeout_second = timeout / 1000 / 1000;
+    int64_t timeout_microsecond = timeout % (1000 * 1000);
+    timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
+    evtimer_add(timer_.get(), &tm);
+  }
+
+  void TimerCB(int, int16_t) {
+    conn_->Reply(NoopReply());
+    timer_.reset();
+    UnblockKeys();
+    auto bev = conn_->GetBufferEvent();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
+  }
+
+ protected:
+  Connection *conn_ = nullptr;
+  UniqueEvent timer_;
+};
+
+}  // namespace redis
diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc
index 82e3e09188d..ad5a50dbd41 100644
--- a/src/commands/cmd_list.cc
+++ b/src/commands/cmd_list.cc
@@ -19,9 +19,11 @@
  */
 
 #include "commander.h"
+#include "commands/blocking_commander.h"
 #include "commands/command_parser.h"
 #include "error_constants.h"
 #include "event_util.h"
+#include "server/redis_reply.h"
 #include "server/server.h"
 #include "types/redis_list.h"
 
@@ -232,9 +234,7 @@ class CommandLMPop : public Commander {
   std::vector<std::string> keys_;
 };
 
-class CommandBPop : public Commander,
-                    private EvbufCallbackBase<CommandBPop, false>,
-                    private EventCallbackBase<CommandBPop> {
+class CommandBPop : public BlockingCommander {
  public:
   explicit CommandBPop(bool left) : left_(left) {}
 
@@ -261,34 +261,26 @@ class CommandBPop : public Commander,
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
     svr_ = svr;
-    conn_ = conn;
+    InitConnection(conn);
 
-    auto bev = conn->GetBufferEvent();
     auto s = TryPopFromList();
     if (s.ok() || !s.IsNotFound()) {
       return Status::OK();  // error has already output in TryPopFromList
     }
 
-    if (conn->IsInExec()) {
-      *output = redis::MultiLen(-1);
-      return Status::OK();  // No blocking in multi-exec
-    }
+    return StartBlocking(timeout_, output);
+  }
 
+  void BlockKeys() override {
     for (const auto &key : keys_) {
       svr_->BlockOnKey(key, conn_);
     }
+  }
 
-    SetCB(bev);
-
-    if (timeout_) {
-      timer_.reset(NewTimer(bufferevent_get_base(bev)));
-      int64_t timeout_second = timeout_ / 1000 / 1000;
-      int64_t timeout_microsecond = timeout_ % (1000 * 1000);
-      timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
-      evtimer_add(timer_.get(), &tm);
+  void UnblockKeys() override {
+    for (const auto &key : keys_) {
+      svr_->UnblockOnKey(key, conn_);
     }
-
-    return {Status::BlockingCmd};
   }
 
   rocksdb::Status TryPopFromList() {
@@ -318,62 +310,18 @@ class CommandBPop : public Commander,
     return s;
   }
 
-  void OnWrite(bufferevent *bev) {
+  bool OnBlockingWrite() override {
     auto s = TryPopFromList();
-    if (s.IsNotFound()) {
-      // The connection may be waked up but can't pop from list. For example,
-      // connection A is blocking on list and connection B push a new element
-      // then wake up the connection A, but this element may be token by other connection C.
-      // So we need to wait for the wake event again by disabling the WRITE event.
-      bufferevent_disable(bev, EV_WRITE);
-      return;
-    }
-
-    if (timer_) {
-      timer_.reset();
-    }
-
-    unBlockingAll();
-    conn_->SetCB(bev);
-    bufferevent_enable(bev, EV_READ);
-    // We need to manually trigger the read event since we will stop processing commands
-    // in connection after the blocking command, so there may have some commands to be processed.
-    // Related issue: https://github.com/apache/kvrocks/issues/831
-    bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+    return !s.IsNotFound();
   }
 
-  void OnEvent(bufferevent *bev, int16_t events) {
-    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
-      if (timer_ != nullptr) {
-        timer_.reset();
-      }
-      unBlockingAll();
-    }
-    conn_->OnEvent(bev, events);
-  }
-
-  void TimerCB(int, int16_t events) {
-    conn_->Reply(redis::NilString());
-    timer_.reset();
-    unBlockingAll();
-    auto bev = conn_->GetBufferEvent();
-    conn_->SetCB(bev);
-    bufferevent_enable(bev, EV_READ);
-  }
+  std::string NoopReply() override { return redis::NilString(); }
 
  private:
   bool left_ = false;
   int64_t timeout_ = 0;  // microseconds
   std::vector<std::string> keys_;
   Server *svr_ = nullptr;
-  Connection *conn_ = nullptr;
-  UniqueEvent timer_;
-
-  void unBlockingAll() {
-    for (const auto &key : keys_) {
-      svr_->UnblockOnKey(key, conn_);
-    }
-  }
 };
 
 class CommandBLPop : public CommandBPop {
@@ -632,9 +580,7 @@ class CommandLMove : public Commander {
   bool dst_left_;
 };
 
-class CommandBLMove : public Commander,
-                      private EvbufCallbackBase<CommandBLMove, false>,
-                      private EventCallbackBase<CommandBLMove> {
+class CommandBLMove : public BlockingCommander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
     auto arg_val = util::ToLower(args_[3]);
@@ -663,7 +609,7 @@ class CommandBLMove : public Commander,
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
     svr_ = svr;
-    conn_ = conn;
+    InitConnection(conn);
 
     redis::List list_db(svr->storage, conn->GetNamespace());
     std::string elem;
@@ -676,87 +622,37 @@ class CommandBLMove : public Commander,
       return Status::OK();
     }
 
-    if (conn->IsInExec()) {
-      *output = redis::MultiLen(-1);
-      return Status::OK();  // no blocking in multi-exec
-    }
-
-    svr_->BlockOnKey(args_[1], conn_);
-    auto bev = conn->GetBufferEvent();
-    SetCB(bev);
+    return StartBlocking(timeout_, output);
+  }
 
-    if (timeout_) {
-      timer_.reset(NewTimer(bufferevent_get_base(bev)));
-      int64_t timeout_second = timeout_ / 1000 / 1000;
-      int64_t timeout_microsecond = timeout_ % (1000 * 1000);
-      timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
-      evtimer_add(timer_.get(), &tm);
-    }
+  void BlockKeys() override { svr_->BlockOnKey(args_[1], conn_); }
 
-    return {Status::BlockingCmd};
-  }
+  void UnblockKeys() override { svr_->UnblockOnKey(args_[1], conn_); }
 
-  void OnWrite(bufferevent *bev) {
+  bool OnBlockingWrite() override {
     redis::List list_db(svr_->storage, conn_->GetNamespace());
     std::string elem;
     auto s = list_db.LMove(args_[1], args_[2], src_left_, dst_left_, &elem);
     if (!s.ok() && !s.IsNotFound()) {
       conn_->Reply(redis::Error("ERR " + s.ToString()));
-      return;
+      return true;
     }
 
-    if (elem.empty()) {
-      // The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and
-      // connection B added a new element; then connection A was unblocked, but this element may be taken by
-      // another connection C. So we need to block connection A again and wait for the element being added
-      // by disabling the WRITE event.
-      bufferevent_disable(bev, EV_WRITE);
-      return;
+    bool empty = elem.empty();
+    if (!empty) {
+      conn_->Reply(redis::BulkString(elem));
     }
 
-    conn_->Reply(redis::BulkString(elem));
-
-    if (timer_) {
-      timer_.reset();
-    }
-
-    unblockOnSrc();
-    conn_->SetCB(bev);
-    bufferevent_enable(bev, EV_READ);
-    // We need to manually trigger the read event since we will stop processing commands
-    // in connection after the blocking command, so there may have some commands to be processed.
-    // Related issue: https://github.com/apache/kvrocks/issues/831
-    bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+    return !empty;
   }
 
-  void OnEvent(bufferevent *bev, int16_t events) {
-    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
-      if (timer_ != nullptr) {
-        timer_.reset();
-      }
-      unblockOnSrc();
-    }
-    conn_->OnEvent(bev, events);
-  }
-
-  void TimerCB(int, int16_t) {
-    conn_->Reply(redis::MultiLen(-1));
-    timer_.reset();
-    unblockOnSrc();
-    auto bev = conn_->GetBufferEvent();
-    conn_->SetCB(bev);
-    bufferevent_enable(bev, EV_READ);
-  }
+  std::string NoopReply() override { return redis::MultiLen(-1); }
 
  private:
   bool src_left_;
   bool dst_left_;
   int64_t timeout_ = 0;  // microseconds
   Server *svr_ = nullptr;
-  Connection *conn_ = nullptr;
-  UniqueEvent timer_;
-
-  void unblockOnSrc() { svr_->UnblockOnKey(args_[1], conn_); }
 };
 
 class CommandLPos : public Commander {
diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc
index 52c9a3d4a05..41493b6e153 100644
--- a/src/commands/cmd_zset.cc
+++ b/src/commands/cmd_zset.cc
@@ -22,6 +22,7 @@
 
 #include "command_parser.h"
 #include "commander.h"
+#include "commands/blocking_commander.h"
 #include "commands/scan_base.h"
 #include "error_constants.h"
 #include "server/redis_reply.h"
@@ -293,9 +294,7 @@ static rocksdb::Status PopFromMultipleZsets(redis::ZSet *zset_db, const std::vec
   return rocksdb::Status::OK();
 }
 
-class CommandBZPop : public Commander,
-                     private EvbufCallbackBase<CommandBZPop, false>,
-                     private EventCallbackBase<CommandBZPop> {
+class CommandBZPop : public BlockingCommander {
  public:
   explicit CommandBZPop(bool min) : min_(min) {}
 
@@ -315,7 +314,7 @@ class CommandBZPop : public Commander,
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
     svr_ = svr;
-    conn_ = conn;
+    InitConnection(conn);
 
     std::string user_key;
     std::vector<MemberScore> member_scores;
@@ -331,28 +330,21 @@ class CommandBZPop : public Commander,
       return Status::OK();
     }
 
-    // all sorted sets are empty
-    if (conn->IsInExec()) {
-      *output = redis::MultiLen(-1);
-      return Status::OK();  // no blocking in multi-exec
-    }
+    return StartBlocking(timeout_, output);
+  }
+
+  std::string NoopReply() override { return redis::MultiLen(-1); }
 
+  void BlockKeys() override {
     for (const auto &key : keys_) {
       svr_->BlockOnKey(key, conn_);
     }
+  }
 
-    auto bev = conn->GetBufferEvent();
-    SetCB(bev);
-
-    if (timeout_) {
-      timer_.reset(NewTimer(bufferevent_get_base(bev)));
-      int64_t timeout_second = timeout_ / 1000 / 1000;
-      int64_t timeout_microsecond = timeout_ % (1000 * 1000);
-      timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
-      evtimer_add(timer_.get(), &tm);
+  void UnblockKeys() override {
+    for (const auto &key : keys_) {
+      svr_->UnblockOnKey(key, conn_);
     }
-
-    return {Status::BlockingCmd};
   }
 
   void SendMembersWithScores(const std::vector<MemberScore> &member_scores, const std::string &user_key) {
@@ -366,7 +358,7 @@ class CommandBZPop : public Commander,
     conn_->Reply(output);
   }
 
-  void OnWrite(bufferevent *bev) {
+  bool OnBlockingWrite() override {
     std::string user_key;
     std::vector<MemberScore> member_scores;
 
@@ -374,50 +366,15 @@ class CommandBZPop : public Commander,
     auto s = PopFromMultipleZsets(&zset_db, keys_, min_, 1, &user_key, &member_scores);
     if (!s.ok()) {
       conn_->Reply(redis::Error("ERR " + s.ToString()));
-      return;
-    }
-
-    if (member_scores.empty()) {
-      // The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and
-      // connection B added a new element; then connection A was unblocked, but this element may be taken by
-      // another connection C. So we need to block connection A again and wait for the element being added
-      // by disabling the WRITE event.
-      bufferevent_disable(bev, EV_WRITE);
-      return;
+      return true;
     }
 
-    SendMembersWithScores(member_scores, user_key);
-
-    if (timer_) {
-      timer_.reset();
-    }
-
-    unblockOnAllKeys();
-    conn_->SetCB(bev);
-    bufferevent_enable(bev, EV_READ);
-    // We need to manually trigger the read event since we will stop processing commands
-    // in connection after the blocking command, so there may have some commands to be processed.
-    // Related issue: https://github.com/apache/kvrocks/issues/831
-    bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
-  }
-
-  void OnEvent(bufferevent *bev, int16_t events) {
-    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
-      if (timer_ != nullptr) {
-        timer_.reset();
-      }
-      unblockOnAllKeys();
+    bool empty = member_scores.empty();
+    if (!empty) {
+      SendMembersWithScores(member_scores, user_key);
     }
-    conn_->OnEvent(bev, events);
-  }
 
-  void TimerCB(int, int16_t) {
-    conn_->Reply(redis::MultiLen(-1));
-    timer_.reset();
-    unblockOnAllKeys();
-    auto bev = conn_->GetBufferEvent();
-    conn_->SetCB(bev);
-    bufferevent_enable(bev, EV_READ);
+    return !empty;
   }
 
  private:
@@ -425,14 +382,6 @@ class CommandBZPop : public Commander,
   int64_t timeout_ = 0;  // microseconds
   std::vector<std::string> keys_;
   Server *svr_ = nullptr;
-  Connection *conn_ = nullptr;
-  UniqueEvent timer_;
-
-  void unblockOnAllKeys() {
-    for (const auto &key : keys_) {
-      svr_->UnblockOnKey(key, conn_);
-    }
-  }
 };
 
 class CommandBZPopMin : public CommandBZPop {
@@ -518,9 +467,7 @@ class CommandZMPop : public Commander {
   int count_ = 0;
 };
 
-class CommandBZMPop : public Commander,
-                      private EvbufCallbackBase<CommandBZMPop, false>,
-                      private EventCallbackBase<CommandBZMPop> {
+class CommandBZMPop : public BlockingCommander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
     CommandParser parser(args, 1);
@@ -557,7 +504,7 @@ class CommandBZMPop : public Commander,
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
     svr_ = svr;
-    conn_ = conn;
+    InitConnection(conn);
 
     std::string user_key;
     std::vector<MemberScore> member_scores;
@@ -573,31 +520,24 @@ class CommandBZMPop : public Commander,
       return Status::OK();
     }
 
-    // all sorted sets are empty
-    if (conn->IsInExec()) {
-      *output = redis::MultiLen(-1);
-      return Status::OK();  // no blocking in multi-exec
-    }
+    return StartBlocking(timeout_, output);
+  }
 
+  void BlockKeys() override {
     for (const auto &key : keys_) {
       svr_->BlockOnKey(key, conn_);
     }
+  }
 
-    auto bev = conn->GetBufferEvent();
-    SetCB(bev);
-
-    if (timeout_) {
-      timer_.reset(NewTimer(bufferevent_get_base(bev)));
-      int64_t timeout_second = timeout_ / 1000 / 1000;
-      int64_t timeout_microsecond = timeout_ % (1000 * 1000);
-      timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
-      evtimer_add(timer_.get(), &tm);
+  void UnblockKeys() override {
+    for (const auto &key : keys_) {
+      svr_->UnblockOnKey(key, conn_);
     }
-
-    return {Status::BlockingCmd};
   }
 
-  void OnWrite(bufferevent *bev) {
+  std::string NoopReply() override { return redis::NilString(); }
+
+  bool OnBlockingWrite() override {
     std::string user_key;
     std::vector<MemberScore> member_scores;
 
@@ -605,50 +545,15 @@ class CommandBZMPop : public Commander,
     auto s = PopFromMultipleZsets(&zset_db, keys_, flag_ == ZSET_MIN, count_, &user_key, &member_scores);
     if (!s.ok()) {
       conn_->Reply(redis::Error("ERR " + s.ToString()));
-      return;
-    }
-
-    if (member_scores.empty()) {
-      // The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and
-      // connection B added a new element; then connection A was unblocked, but this element may be taken by
-      // another connection C. So we need to block connection A again and wait for the element being added
-      // by disabling the WRITE event.
-      bufferevent_disable(bev, EV_WRITE);
-      return;
+      return true;
     }
 
-    SendMembersWithScoresForZMpop(conn_, user_key, member_scores);
-
-    if (timer_) {
-      timer_.reset();
-    }
-
-    unblockOnAllKeys();
-    conn_->SetCB(bev);
-    bufferevent_enable(bev, EV_READ);
-    // We need to manually trigger the read event since we will stop processing commands
-    // in connection after the blocking command, so there may have some commands to be processed.
-    // Related issue: https://github.com/apache/kvrocks/issues/831
-    bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
-  }
-
-  void OnEvent(bufferevent *bev, int16_t events) {
-    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
-      if (timer_ != nullptr) {
-        timer_.reset();
-      }
-      unblockOnAllKeys();
+    bool empty = member_scores.empty();
+    if (!empty) {
+      SendMembersWithScoresForZMpop(conn_, user_key, member_scores);
     }
-    conn_->OnEvent(bev, events);
-  }
 
-  void TimerCB(int, int16_t events) {
-    conn_->Reply(redis::NilString());
-    timer_.reset();
-    unblockOnAllKeys();
-    auto bev = conn_->GetBufferEvent();
-    conn_->SetCB(bev);
-    bufferevent_enable(bev, EV_READ);
+    return !empty;
   }
 
   static CommandKeyRange Range(const std::vector<std::string> &args) {
@@ -663,14 +568,6 @@ class CommandBZMPop : public Commander,
   enum { ZSET_MIN, ZSET_MAX, ZSET_NONE } flag_ = ZSET_NONE;
   int count_ = 0;
   Server *svr_ = nullptr;
-  Connection *conn_ = nullptr;
-  UniqueEvent timer_;
-
-  void unblockOnAllKeys() {
-    for (const auto &key : keys_) {
-      svr_->UnblockOnKey(key, conn_);
-    }
-  }
 };
 
 class CommandZRangeStore : public Commander {