Skip to content

Commit

Permalink
Add BlockingCommander to refactor all blocking commands (#1757)
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice authored Sep 13, 2023
1 parent 4aa95fa commit 5c9137c
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 269 deletions.
127 changes: 127 additions & 0 deletions src/commands/blocking_commander.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once

#include "commander.h"
#include "event_util.h"
#include "server/redis_connection.h"

namespace redis {

class BlockingCommander : public Commander,
private EvbufCallbackBase<BlockingCommander, false>,
private EventCallbackBase<BlockingCommander> {
public:
// method to reply when no operation happens
virtual std::string NoopReply() = 0;

// method to block keys
virtual void BlockKeys() = 0;

// method to unblock keys
virtual void UnblockKeys() = 0;

// method to access database in write callback
// the return value indicates if the real database operation happens
// in other words, returning true indicates ending the blocking
virtual bool OnBlockingWrite() = 0;

// to start the blocking process
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
if (conn_->IsInExec()) {
*output = NoopReply();
return Status::OK(); // no blocking in multi-exec
}

BlockKeys();
SetCB(conn_->GetBufferEvent());

if (timeout) {
InitTimer(timeout);
}

return {Status::BlockingCmd};
}

void OnWrite(bufferevent *bev) {
bool done = OnBlockingWrite();

if (!done) {
// The connection may be waked up but can't pop from the datatype.
// For example, connection A is blocked on it and connection B added a new element;
// then connection A was unblocked, but this element may be taken by
// another connection C. So we need to block connection A again
// and wait for the element being added by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
}

if (timer_) {
timer_.reset();
}

UnblockKeys();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop processing commands
// in connection after the blocking command, so there may have some commands to be processed.
// Related issue: https://github.com/apache/kvrocks/issues/831
bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}

void OnEvent(bufferevent *bev, int16_t events) {
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
if (timer_ != nullptr) {
timer_.reset();
}
UnblockKeys();
}
conn_->OnEvent(bev, events);
}

// Usually put to the top of the Execute method
void InitConnection(Connection *conn) { conn_ = conn; }

void InitTimer(int64_t timeout) {
auto bev = conn_->GetBufferEvent();
timer_.reset(NewTimer(bufferevent_get_base(bev)));
int64_t timeout_second = timeout / 1000 / 1000;
int64_t timeout_microsecond = timeout % (1000 * 1000);
timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
evtimer_add(timer_.get(), &tm);
}

void TimerCB(int, int16_t) {
conn_->Reply(NoopReply());
timer_.reset();
UnblockKeys();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}

protected:
Connection *conn_ = nullptr;
UniqueEvent timer_;
};

} // namespace redis
158 changes: 27 additions & 131 deletions src/commands/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
*/

#include "commander.h"
#include "commands/blocking_commander.h"
#include "commands/command_parser.h"
#include "error_constants.h"
#include "event_util.h"
#include "server/redis_reply.h"
#include "server/server.h"
#include "types/redis_list.h"

Expand Down Expand Up @@ -232,9 +234,7 @@ class CommandLMPop : public Commander {
std::vector<std::string> keys_;
};

class CommandBPop : public Commander,
private EvbufCallbackBase<CommandBPop, false>,
private EventCallbackBase<CommandBPop> {
class CommandBPop : public BlockingCommander {
public:
explicit CommandBPop(bool left) : left_(left) {}

Expand All @@ -261,34 +261,26 @@ class CommandBPop : public Commander,

Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
conn_ = conn;
InitConnection(conn);

auto bev = conn->GetBufferEvent();
auto s = TryPopFromList();
if (s.ok() || !s.IsNotFound()) {
return Status::OK(); // error has already output in TryPopFromList
}

if (conn->IsInExec()) {
*output = redis::MultiLen(-1);
return Status::OK(); // No blocking in multi-exec
}
return StartBlocking(timeout_, output);
}

void BlockKeys() override {
for (const auto &key : keys_) {
svr_->BlockOnKey(key, conn_);
}
}

SetCB(bev);

if (timeout_) {
timer_.reset(NewTimer(bufferevent_get_base(bev)));
int64_t timeout_second = timeout_ / 1000 / 1000;
int64_t timeout_microsecond = timeout_ % (1000 * 1000);
timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
evtimer_add(timer_.get(), &tm);
void UnblockKeys() override {
for (const auto &key : keys_) {
svr_->UnblockOnKey(key, conn_);
}

return {Status::BlockingCmd};
}

rocksdb::Status TryPopFromList() {
Expand Down Expand Up @@ -318,62 +310,18 @@ class CommandBPop : public Commander,
return s;
}

void OnWrite(bufferevent *bev) {
bool OnBlockingWrite() override {
auto s = TryPopFromList();
if (s.IsNotFound()) {
// The connection may be waked up but can't pop from list. For example,
// connection A is blocking on list and connection B push a new element
// then wake up the connection A, but this element may be token by other connection C.
// So we need to wait for the wake event again by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
}

if (timer_) {
timer_.reset();
}

unBlockingAll();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop processing commands
// in connection after the blocking command, so there may have some commands to be processed.
// Related issue: https://github.com/apache/kvrocks/issues/831
bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
return !s.IsNotFound();
}

void OnEvent(bufferevent *bev, int16_t events) {
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
if (timer_ != nullptr) {
timer_.reset();
}
unBlockingAll();
}
conn_->OnEvent(bev, events);
}

void TimerCB(int, int16_t events) {
conn_->Reply(redis::NilString());
timer_.reset();
unBlockingAll();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}
std::string NoopReply() override { return redis::NilString(); }

private:
bool left_ = false;
int64_t timeout_ = 0; // microseconds
std::vector<std::string> keys_;
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
UniqueEvent timer_;

void unBlockingAll() {
for (const auto &key : keys_) {
svr_->UnblockOnKey(key, conn_);
}
}
};

class CommandBLPop : public CommandBPop {
Expand Down Expand Up @@ -632,9 +580,7 @@ class CommandLMove : public Commander {
bool dst_left_;
};

class CommandBLMove : public Commander,
private EvbufCallbackBase<CommandBLMove, false>,
private EventCallbackBase<CommandBLMove> {
class CommandBLMove : public BlockingCommander {
public:
Status Parse(const std::vector<std::string> &args) override {
auto arg_val = util::ToLower(args_[3]);
Expand Down Expand Up @@ -663,7 +609,7 @@ class CommandBLMove : public Commander,

Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
conn_ = conn;
InitConnection(conn);

redis::List list_db(svr->storage, conn->GetNamespace());
std::string elem;
Expand All @@ -676,87 +622,37 @@ class CommandBLMove : public Commander,
return Status::OK();
}

if (conn->IsInExec()) {
*output = redis::MultiLen(-1);
return Status::OK(); // no blocking in multi-exec
}

svr_->BlockOnKey(args_[1], conn_);
auto bev = conn->GetBufferEvent();
SetCB(bev);
return StartBlocking(timeout_, output);
}

if (timeout_) {
timer_.reset(NewTimer(bufferevent_get_base(bev)));
int64_t timeout_second = timeout_ / 1000 / 1000;
int64_t timeout_microsecond = timeout_ % (1000 * 1000);
timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
evtimer_add(timer_.get(), &tm);
}
void BlockKeys() override { svr_->BlockOnKey(args_[1], conn_); }

return {Status::BlockingCmd};
}
void UnblockKeys() override { svr_->UnblockOnKey(args_[1], conn_); }

void OnWrite(bufferevent *bev) {
bool OnBlockingWrite() override {
redis::List list_db(svr_->storage, conn_->GetNamespace());
std::string elem;
auto s = list_db.LMove(args_[1], args_[2], src_left_, dst_left_, &elem);
if (!s.ok() && !s.IsNotFound()) {
conn_->Reply(redis::Error("ERR " + s.ToString()));
return;
return true;
}

if (elem.empty()) {
// The connection may be waked up but can't pop from a zset. For example, connection A is blocked on zset and
// connection B added a new element; then connection A was unblocked, but this element may be taken by
// another connection C. So we need to block connection A again and wait for the element being added
// by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
bool empty = elem.empty();
if (!empty) {
conn_->Reply(redis::BulkString(elem));
}

conn_->Reply(redis::BulkString(elem));

if (timer_) {
timer_.reset();
}

unblockOnSrc();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop processing commands
// in connection after the blocking command, so there may have some commands to be processed.
// Related issue: https://github.com/apache/kvrocks/issues/831
bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
return !empty;
}

void OnEvent(bufferevent *bev, int16_t events) {
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
if (timer_ != nullptr) {
timer_.reset();
}
unblockOnSrc();
}
conn_->OnEvent(bev, events);
}

void TimerCB(int, int16_t) {
conn_->Reply(redis::MultiLen(-1));
timer_.reset();
unblockOnSrc();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}
std::string NoopReply() override { return redis::MultiLen(-1); }

private:
bool src_left_;
bool dst_left_;
int64_t timeout_ = 0; // microseconds
Server *svr_ = nullptr;
Connection *conn_ = nullptr;
UniqueEvent timer_;

void unblockOnSrc() { svr_->UnblockOnKey(args_[1], conn_); }
};

class CommandLPos : public Commander {
Expand Down
Loading

0 comments on commit 5c9137c

Please sign in to comment.