From 92963afa665ca3985b6c20e4b947a8a7670e090d Mon Sep 17 00:00:00 2001 From: Yaroslav Stepanchuk Date: Tue, 24 Jan 2023 14:48:40 +0200 Subject: [PATCH] XREAD returns dedicated status if it blocks In order to be consistent with such blocking commands like BLPOP and BRPOP --- src/commands/redis_cmd.cc | 2 +- src/server/redis_connection.cc | 22 ++++++++++++---------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/commands/redis_cmd.cc b/src/commands/redis_cmd.cc index 28ecaa45dd0..661073ac95e 100644 --- a/src/commands/redis_cmd.cc +++ b/src/commands/redis_cmd.cc @@ -6079,7 +6079,7 @@ class CommandXRead : public Commander { evtimer_add(timer_, &tm); } - return Status::OK(); + return {Status::BlockingCmd}; } static void WriteCB(bufferevent *bev, void *ctx) { diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index 8f41750ee17..4a7a420e16a 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -308,6 +308,7 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { Reply(Redis::Error("NOAUTH Authentication required.")); continue; } + if (password.empty()) { BecomeAdmin(); SetNamespace(kDefaultNamespace); @@ -319,11 +320,10 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { std::unique_ptr concurrency; // Allow concurrency std::unique_ptr exclusivity; // Need exclusivity - // If the command need to process exclusively, we need to get 'ExclusivityGuard' + // If the command needs to process exclusively, we need to get 'ExclusivityGuard' // that can guarantee other threads can't come into critical zone, such as DEBUG, // CLUSTER subcommand, CONFIG SET, MULTI, LUA (in the immediate future). - // Otherwise, we just use 'ConcurrencyGuard' to allow all workers to execute - // commands at the same time. + // Otherwise, we just use 'ConcurrencyGuard' to allow all workers to execute commands at the same time. if (IsFlagEnabled(Connection::kMultiExec) && attributes->name != "exec") { // No lock guard, because 'exec' command has acquired 'WorkExclusivityGuard' } else if (attributes->is_exclusive() || @@ -332,26 +332,24 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { cmd_tokens.size() >= 2 && Cluster::SubCommandIsExecExclusive(cmd_tokens[1]))) { exclusivity = svr_->WorkExclusivityGuard(); - // When executing lua script commands that have "exclusive" attribute, - // we need to know current connection, but we should set current - // connection after acquiring the WorkExclusivityGuard to make it - // thread-safe + // When executing lua script commands that have "exclusive" attribute, we need to know current connection, + // but we should set current connection after acquiring the WorkExclusivityGuard to make it thread-safe svr_->SetCurrentConnection(this); } else { concurrency = svr_->WorkConcurrencyGuard(); } if (cmd_name == "eval_ro" || cmd_name == "evalsha_ro") { - // if executing read only lua script commands, set current - // connection. + // if executing read only lua script commands, set current connection. svr_->SetCurrentConnection(this); } - if (svr_->IsLoading() && attributes->is_ok_loading() == false) { + if (svr_->IsLoading() && !attributes->is_ok_loading()) { Reply(Redis::Error("LOADING kvrocks is restoring the db from backup")); if (IsFlagEnabled(Connection::kMultiExec)) multi_error_ = true; continue; } + int arity = attributes->arity; int tokens = static_cast(cmd_tokens.size()); if ((arity > 0 && tokens != arity) || (arity < 0 && tokens < -arity)) { @@ -359,6 +357,7 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { Reply(Redis::Error("ERR wrong number of arguments")); continue; } + current_cmd_->SetArgs(cmd_tokens); s = current_cmd_->Parse(); if (!s.IsOK()) { @@ -394,6 +393,7 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { Reply(Redis::Error("READONLY You can't write against a read only slave.")); continue; } + if (!config->slave_serve_stale_data && svr_->IsSlave() && cmd_name != "info" && cmd_name != "slaveof" && svr_->GetReplicationState() != kReplConnected) { Reply( @@ -419,11 +419,13 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { if (s.Is()) { break; } + // Reply for MULTI if (!s.IsOK()) { Reply(Redis::Error("ERR " + s.Msg())); continue; } + if (!reply.empty()) Reply(reply); reply.clear(); }