Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add CLIENT PAUSE and CLIENT REPLY subcommands #2495

Open
wants to merge 14 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 64 additions & 4 deletions src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,11 @@ class CommandSlowlog : public Commander {
class CommandClient : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
subcommand_ = util::ToLower(args[1]);
// subcommand: getname id kill list info setname
if ((subcommand_ == "id" || subcommand_ == "getname" || subcommand_ == "list" || subcommand_ == "info") &&
CommandParser parser(args, 1);
subcommand_ = util::ToLower(GET_OR_RET(parser.TakeStr()));
// subcommand: getname id kill list info setname unpause
if ((subcommand_ == "id" || subcommand_ == "getname" || subcommand_ == "list" || subcommand_ == "info" ||
subcommand_ == "unpause") &&
args.size() == 2) {
return Status::OK();
}
Expand Down Expand Up @@ -477,7 +479,48 @@ class CommandClient : public Commander {
}
return Status::OK();
}
return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL ip:port|GETNAME|SETNAME"};

// command format: client pause <timeout> [write|all]
if ((subcommand_ == "pause")) {
if (args.size() != 3 && args.size() != 4) {
return {Status::RedisParseErr, errInvalidSyntax};
}

pause_timeout_ms_ = GET_OR_RET(parser.TakeInt());

if (parser.EatEqICase("all")) {
pause_type_ = kPauseAll;
} else if (parser.EatEqICase("write")) {
pause_type_ = kPauseWrite;
} else if (!parser.Good()) { // Default mode is to pause all commands
pause_type_ = kPauseAll;
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}

return Status::OK();
}

if (subcommand_ == "reply") {
if (args.size() != 2 && args.size() != 3) {
return {Status::RedisParseErr, errInvalidSyntax};
}

if (parser.EatEqICase("on")) {
reply_type_ = 0;
} else if (parser.EatEqICase("off")) {
reply_type_ = Connection::Flag::kReplyModeOff;
} else if (parser.EatEqICase("skip")) {
reply_type_ = Connection::Flag::kReplyModeSkipNext;
} else {
return {Status::RedisParseErr, errInvalidSyntax};
}

return Status::OK();
}

return {Status::RedisInvalidCmd,
"Syntax error, try CLIENT LIST|INFO|KILL|PAUSE|UNPAUSE|REPLY ip:port|GETNAME|SETNAME|timeout"};
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -510,6 +553,20 @@ class CommandClient : public Commander {
*output = redis::SimpleString("OK");
}
return Status::OK();
} else if (subcommand_ == "pause") {
srv->PauseCommands(pause_type_, pause_timeout_ms_);
return Status::OK();
} else if (subcommand_ == "unpause") {
srv->UnpauseCommands();
return Status::OK();
} else if (subcommand_ == "reply") {
conn->DisableFlag(redis::Connection::Flag::kReplyModeOff);
conn->DisableFlag(redis::Connection::Flag::kReplyModeSkip);
conn->DisableFlag(redis::Connection::Flag::kReplyModeSkipNext);
if (reply_type_ != 0) {
conn->EnableFlag((redis::Connection::Flag)reply_type_);
}
return Status::OK();
}

return {Status::RedisInvalidCmd, "Syntax error, try CLIENT LIST|INFO|KILL ip:port|GETNAME|SETNAME"};
Expand All @@ -521,6 +578,9 @@ class CommandClient : public Commander {
std::string subcommand_;
bool skipme_ = false;
int64_t kill_type_ = 0;
int64_t pause_type_ = 0;
int64_t pause_timeout_ms_ = 0;
int64_t reply_type_ = 0;
uint64_t id_ = 0;
bool new_format_ = true;
};
Expand Down
32 changes: 32 additions & 0 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,17 @@ void Connection::OnEvent(bufferevent *bev, int16_t events) {
}

void Connection::Reply(const std::string &msg) {
// Do not send replies for both SKIP and OFF modes
if (IsFlagEnabled(Flag::kReplyModeOff) || IsFlagEnabled(Flag::kReplyModeSkip)) {
return;
}

// Skip starting from the next reply for SKIP mode
if (IsFlagEnabled(Flag::kReplyModeSkipNext)) {
DisableFlag(Flag::kReplyModeSkipNext);
EnableFlag(Flag::kReplyModeSkip);
}

owner_->srv->stats.IncrOutboundBytes(msg.size());
redis::Reply(bufferevent_get_output(bev_), msg);
}
Expand Down Expand Up @@ -368,6 +379,17 @@ static bool IsCmdForIndexing(const CommandAttributes *attr) {
}

void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
// Do not execute commands if we are in pause mode
if (srv_->GetCommandPauseType() == kPauseAll ||
(srv_->GetCommandPauseType() == kPauseWrite && this->IsFlagEnabled(kPaused))) {
return;
} else {
// Unpause the client when the pause mode ends
if (this->IsFlagEnabled(kPaused)) {
this->DisableFlag(kPaused);
}
}

const Config *config = srv_->GetConfig();
std::string reply;
std::string password = config->requirepass;
Expand Down Expand Up @@ -396,6 +418,16 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
auto cmd_name = attributes->name;
auto cmd_flags = attributes->GenerateFlags(cmd_tokens);

// Start pausing the client when we first receive the write command
if (!(this->GetClientType() == kTypeSlave) && srv_->GetCommandPauseType() == kPauseWrite &&
(cmd_flags & kCmdWrite)) {
this->EnableFlag(kPaused); // Pause the client
to_process_cmds->emplace_back(cmd_tokens);

Reply(redis::SimpleString("OK")); // Should return OK ASAP
break; // Do not process remanining commands for now
}

if (GetNamespace().empty()) {
if (!password.empty()) {
if (cmd_name != "auth" && cmd_name != "hello") {
Expand Down
4 changes: 4 additions & 0 deletions src/server/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class Connection : public EvbufCallbackBase<Connection> {
kMultiExec = 1 << 8,
kReadOnly = 1 << 9,
kAsking = 1 << 10,
kReplyModeOff = 1 << 11,
kReplyModeSkip = 1 << 12,
kReplyModeSkipNext = 1 << 13,
kPaused = 1 << 14
};

explicit Connection(bufferevent *bev, Worker *owner);
Expand Down
16 changes: 16 additions & 0 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,15 @@ void Server::GetInfo(const std::string &ns, const std::string &section, std::str
*info = string_stream.str();
}

int64_t Server::GetCommandPauseType() {
// Stop pausing command if the timeout has reached
if (pause_end_timestamp_ms_.load(std::memory_order_relaxed) <= util::GetTimeStampMS()) {
UnpauseCommands();
}

return (int64_t)pause_type_.load(std::memory_order_relaxed);
}

std::string Server::GetRocksDBStatsJson() const {
jsoncons::json stats_json;

Expand Down Expand Up @@ -1798,6 +1807,13 @@ Status Server::ExecPropagatedCommand(const std::vector<std::string> &tokens) {
return Status::OK();
}

void Server::PauseCommands(uint64_t type, uint64_t timeout_ms) {
pause_type_.store((int64_t)type, std::memory_order_relaxed);
pause_end_timestamp_ms_.store((int64_t)(util::GetTimeStampMS() + timeout_ms), std::memory_order_relaxed);
}

void Server::UnpauseCommands() { pause_type_.store((int64_t)kPauseNone, std::memory_order_relaxed); }

// AdjustOpenFilesLimit only try best to raise the max open files according to
// the max clients and RocksDB open file configuration. It also reserves a number
// of file descriptors(128) for extra operations of persistence, listening sockets,
Expand Down
11 changes: 11 additions & 0 deletions src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ enum ClientType {
kTypeSlave = (1ULL << 3), // slave client
};

enum ClientCommandPauseType {
kPauseNone = (1ULL << 0), // pause no commands
kPauseWrite = (1ULL << 1), // pause write commands
kPauseAll = (1ULL << 2) // pause all commands
};

enum ServerLogType { kServerLogNone, kReplIdLog };

enum class AuthResult {
Expand Down Expand Up @@ -242,6 +248,7 @@ class Server {
void GetCommandsStatsInfo(std::string *info);
void GetClusterInfo(std::string *info);
void GetInfo(const std::string &ns, const std::string &section, std::string *info);
int64_t GetCommandPauseType();
std::string GetRocksDBStatsJson() const;
ReplState GetReplicationState();

Expand Down Expand Up @@ -284,6 +291,8 @@ class Server {
Status Propagate(const std::string &channel, const std::vector<std::string> &tokens) const;
Status ExecPropagatedCommand(const std::vector<std::string> &tokens);
Status ExecPropagateScriptCommand(const std::vector<std::string> &tokens);
void PauseCommands(uint64_t type, uint64_t timeout_ms);
void UnpauseCommands();

void SetCurrentConnection(redis::Connection *conn) { curr_connection_ = conn; }
redis::Connection *GetCurrentConnection() { return curr_connection_; }
Expand Down Expand Up @@ -340,6 +349,8 @@ class Server {
Config *config_ = nullptr;
std::string last_random_key_cursor_;
std::mutex last_random_key_cursor_mu_;
std::atomic<int64_t> pause_type_{kPauseNone};
std::atomic<int64_t> pause_end_timestamp_ms_{0};

std::atomic<lua_State *> lua_;

Expand Down
Loading