Skip to content

Commit

Permalink
refactor to adapt to BlockingCommander
Browse files Browse the repository at this point in the history
  • Loading branch information
HolyLow committed Sep 22, 2023
1 parent 6289557 commit 0b4df62
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 123 deletions.
53 changes: 36 additions & 17 deletions src/commands/cmd_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ class CommandBRPop : public CommandBPop {
CommandBRPop() : CommandBPop(false) {}
};

class CommandBLMPop : public BlockedPopCommander {
class CommandBLMPop : public BlockingCommander {
public:
CommandBLMPop() = default;
CommandBLMPop(const CommandBLMPop &) = delete;
Expand All @@ -347,7 +347,7 @@ class CommandBLMPop : public BlockedPopCommander {
CommandParser parser(args, 1);

auto timeout = GET_OR_RET(parser.TakeFloat());
setTimeout(static_cast<int64_t>(timeout * 1000 * 1000));
timeout_ = static_cast<int64_t>(timeout * 1000 * 1000);

auto num_keys = GET_OR_RET(parser.TakeInt<uint32_t>());
keys_.clear();
Expand Down Expand Up @@ -379,18 +379,19 @@ class CommandBLMPop : public BlockedPopCommander {
return Status::OK();
}

static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector<std::string> &args) {
CommandKeyRange range;
range.first_key = 3;
range.key_step = 1;
// This parsing would always succeed as this cmd has been parsed before.
auto num_key = *ParseInt<int32_t>(args[2], 10);
range.last_key = range.first_key + num_key - 1;
return range;
};
Status Execute(Server *svr, Connection *conn, std::string *output) override {
svr_ = svr;
InitConnection(conn);

private:
rocksdb::Status executeUnblocked() override {
auto s = ExecuteUnblocked();
if (s.ok() || !s.IsNotFound()) {
return Status::OK(); // error has already been output
}

return StartBlocking(timeout_, output);
}

rocksdb::Status ExecuteUnblocked() {
redis::List list_db(svr_->storage, conn_->GetNamespace());
std::vector<std::string> elems;
std::string chosen_key;
Expand Down Expand Up @@ -419,23 +420,41 @@ class CommandBLMPop : public BlockedPopCommander {
return s;
}

std::string emptyOutput() override { return redis::NilString(); }

void blockAllKeys() override {
void BlockKeys() override {
for (const auto &key : keys_) {
svr_->BlockOnKey(key, conn_);
}
}

void unblockAllKeys() override {
void UnblockKeys() override {
for (const auto &key : keys_) {
svr_->UnblockOnKey(key, conn_);
}
}

bool OnBlockingWrite() override {
auto s = ExecuteUnblocked();
return !s.IsNotFound();
}

std::string NoopReply() override { return redis::NilString(); }

static const inline CommandKeyRangeGen keyRangeGen = [](const std::vector<std::string> &args) {
CommandKeyRange range;
range.first_key = 3;
range.key_step = 1;
// This parsing would always succeed as this cmd has been parsed before.
auto num_key = *ParseInt<int32_t>(args[2], 10);
range.last_key = range.first_key + num_key - 1;
return range;
};

private:
bool left_;
uint32_t count_ = -1;
int64_t timeout_ = 0; // microseconds
std::vector<std::string> keys_;
Server *svr_ = nullptr;
};

class CommandLRem : public Commander {
Expand Down
74 changes: 0 additions & 74 deletions src/commands/commander.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "commander.h"

#include "cluster/cluster_defs.h"
#include "server/redis_connection.h"

namespace redis {

Expand All @@ -33,79 +32,6 @@ RegisterToCommandTable::RegisterToCommandTable(std::initializer_list<CommandAttr
}
}

Status BlockedPopCommander::Execute(Server *svr, Connection *conn, std::string *output) {
svr_ = svr;
conn_ = conn;

auto bev = conn->GetBufferEvent();
auto s = executeUnblocked();
if (s.ok() || !s.IsNotFound()) {
return Status::OK(); // error has already output in executeUnblocked
}

if (conn->IsInExec()) {
*output = emptyOutput();
return Status::OK(); // No blocking in multi-exec
}

blockAllKeys();

SetCB(bev);

if (timeout_) {
timer_.reset(NewTimer(bufferevent_get_base(bev)));
int64_t timeout_second = timeout_ / 1000 / 1000;
int64_t timeout_microsecond = timeout_ % (1000 * 1000);
timeval tm = {timeout_second, static_cast<int>(timeout_microsecond)};
evtimer_add(timer_.get(), &tm);
}

return {Status::BlockingCmd};
}

void BlockedPopCommander::OnWrite(bufferevent *bev) {
auto s = executeUnblocked();
if (s.IsNotFound()) {
// The connection may be waked up but can't pop from list. For example,
// connection A is blocking on list and connection B push a new element
// then wake up the connection A, but this element may be token by other connection C.
// So we need to wait for the wake event again by disabling the WRITE event.
bufferevent_disable(bev, EV_WRITE);
return;
}

if (timer_) {
timer_.reset();
}

unblockAllKeys();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
// We need to manually trigger the read event since we will stop processing commands
// in connection after the blocking command, so there may have some commands to be processed.
// Related issue: https://github.com/apache/kvrocks/issues/831
bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}

void BlockedPopCommander::OnEvent(bufferevent *bev, int16_t events) {
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
if (timer_ != nullptr) {
timer_.reset();
}
unblockAllKeys();
}
conn_->OnEvent(bev, events);
}

void BlockedPopCommander::TimerCB(int, int16_t events) {
conn_->Reply(emptyOutput());
timer_.reset();
unblockAllKeys();
auto bev = conn_->GetBufferEvent();
conn_->SetCB(bev);
bufferevent_enable(bev, EV_READ);
}

size_t GetCommandNum() { return command_details::redis_command_table.size(); }

const CommandMap *GetOriginalCommands() { return &command_details::original_commands; }
Expand Down
32 changes: 0 additions & 32 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
#include <vector>

#include "cluster/cluster_defs.h"
#include "event_util.h"
#include "parse_util.h"
#include "server/redis_reply.h"
#include "status.h"
Expand Down Expand Up @@ -84,37 +83,6 @@ class Commander {
const CommandAttributes *attributes_ = nullptr;
};

class BlockedPopCommander : public Commander,
private EvbufCallbackBase<BlockedPopCommander, false>,
private EventCallbackBase<BlockedPopCommander> {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) final;

void OnWrite(bufferevent *bev);

void OnEvent(bufferevent *bev, int16_t events);

void TimerCB(int, int16_t events);

protected:
virtual rocksdb::Status executeUnblocked() = 0;

virtual void blockAllKeys() = 0;

virtual void unblockAllKeys() = 0;

virtual std::string emptyOutput() = 0;

void setTimeout(int64_t timeout) { timeout_ = timeout; }

Server *svr_ = nullptr;
Connection *conn_ = nullptr;

private:
int64_t timeout_ = 0; // microseconds
UniqueEvent timer_;
};

class CommanderWithParseMove : Commander {
public:
Status Parse() override { return ParseMove(std::move(args_)); }
Expand Down

0 comments on commit 0b4df62

Please sign in to comment.