diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc index 8c164a39c7a..2a63cdb7db6 100644 --- a/src/redis_cmd.cc +++ b/src/redis_cmd.cc @@ -3365,7 +3365,7 @@ class CommandSlaveOf : public Commander { LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')"; } } else { - s = svr->AddMaster(host_, port_); + s = svr->AddMaster(host_, port_, false); if (s.IsOK()) { *output = Redis::SimpleString("OK"); LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ @@ -3591,6 +3591,18 @@ class CommandClient : public Commander { } else { return Status(Status::RedisParseErr, errInvalidSyntax); } + } else if (args[i] == "type" && moreargs) { + if (args[i+1] == "normal") { + kill_type_ |= kTypeNormal; + } else if (args[i+1] == "pubsub") { + kill_type_ |= kTypePubsub; + } else if (args[i+1] == "master") { + kill_type_ |= kTypeMaster; + } else if (args[i+1] == "replica" || args[i+1] == "slave") { + kill_type_ |= kTypeSlave; + } else { + return Status(Status::RedisParseErr, errInvalidSyntax); + } } else { return Status(Status::RedisParseErr, errInvalidSyntax); } @@ -3619,7 +3631,7 @@ class CommandClient : public Commander { return Status::OK(); } else if (subcommand_ == "kill") { int64_t killed = 0; - srv->KillClient(&killed, addr_, id_, skipme_, conn); + srv->KillClient(&killed, addr_, id_, kill_type_, skipme_, conn); if (new_format_) { *output = Redis::Integer(killed); } else { @@ -3640,6 +3652,7 @@ class CommandClient : public Commander { std::string conn_name_; std::string subcommand_; bool skipme_ = false; + int64_t kill_type_ = 0; uint64_t id_ = 0; bool new_format_ = true; }; @@ -4442,7 +4455,7 @@ CommandAttributes redisCommandTable[] = { ADD_CMD("compact", 1, "read-only", 0, 0, 0, CommandCompact), ADD_CMD("bgsave", 1, "read-only", 0, 0, 0, CommandBGSave), ADD_CMD("flushbackup", 1, "read-only", 0, 0, 0, CommandFlushBackup), - ADD_CMD("slaveof", 3, "read-only", 0, 0, 0, CommandSlaveOf), + ADD_CMD("slaveof", 3, "read-only exclusive", 0, 0, 0, CommandSlaveOf), ADD_CMD("stats", 1, "read-only", 0, 0, 0, CommandStats), ADD_CMD("replconf", -3, "read-only replication", 0, 0, 0, CommandReplConf), diff --git a/src/redis_connection.cc b/src/redis_connection.cc index 7d95d0ff0dd..42c3cdb937a 100644 --- a/src/redis_connection.cc +++ b/src/redis_connection.cc @@ -132,13 +132,26 @@ uint64_t Connection::GetIdleTime() { return static_cast(now-last_interaction_); } +// Currently, master connection is not handled in connection +// but in replication thread. +// +// The function will return one of the following: +// kTypeSlave -> Slave +// kTypeNormal -> Normal client +// kTypePubsub -> Client subscribed to Pub/Sub channels +uint64_t Connection::GetClientType() { + if (IsFlagEnabled(kSlave)) return kTypeSlave; + if (!subscribe_channels_.empty() || !subcribe_patterns_.empty()) return kTypePubsub; + return kTypeNormal; +} + std::string Connection::GetFlags() { std::string flags; if (owner_->IsRepl()) flags.append("R"); if (IsFlagEnabled(kSlave)) flags.append("S"); if (IsFlagEnabled(kCloseAfterReply)) flags.append("c"); if (IsFlagEnabled(kMonitor)) flags.append("M"); - if (!subscribe_channels_.empty()) flags.append("P"); + if (!subscribe_channels_.empty() || !subcribe_patterns_.empty()) flags.append("P"); if (flags.empty()) flags = "N"; return flags; } @@ -284,7 +297,8 @@ void Connection::ExecuteCommands(const std::vector &to_pro password = IsRepl() ? config->masterauth : config->requirepass; for (auto &cmd_tokens : to_process_cmds) { - if (IsFlagEnabled(Redis::Connection::kCloseAfterReply)) break; + if (IsFlagEnabled(Redis::Connection::kCloseAfterReply) && + !IsFlagEnabled(Connection::kMultiExec)) break; if (GetNamespace().empty()) { if (!password.empty() && Util::ToLower(cmd_tokens.front()) != "auth") { Reply(Redis::Error("NOAUTH Authentication required.")); diff --git a/src/redis_connection.h b/src/redis_connection.h index ab6a09b8964..7cb2243eabd 100644 --- a/src/redis_connection.h +++ b/src/redis_connection.h @@ -65,6 +65,7 @@ class Connection { int GetPort() { return port_; } void SetListeningPort(int port) { listening_port_ = port; } int GetListeningPort() { return listening_port_; } + uint64_t GetClientType(); bool IsAdmin() { return is_admin_; } void BecomeAdmin() { is_admin_ = true; } diff --git a/src/server.cc b/src/server.cc index 8669f72fc58..4a99f2fc9cb 100644 --- a/src/server.cc +++ b/src/server.cc @@ -67,7 +67,7 @@ Server::~Server() { // - feed-replica-file: send SST files when slaves ask for full sync Status Server::Start() { if (!config_->master_host.empty()) { - Status s = AddMaster(config_->master_host, static_cast(config_->master_port)); + Status s = AddMaster(config_->master_host, static_cast(config_->master_port), false); if (!s.IsOK()) return s; } for (const auto worker : worker_threads_) { @@ -139,13 +139,18 @@ void Server::Join() { if (compaction_checker_thread_.joinable()) compaction_checker_thread_.join(); } -Status Server::AddMaster(std::string host, uint32_t port) { +Status Server::AddMaster(std::string host, uint32_t port, bool force_reconnect) { slaveof_mu_.lock(); - if (!master_host_.empty() && master_host_ == host && master_port_ == port) { + + // Don't check host and port if 'force_reconnect' argument is set to true + if (!force_reconnect && + !master_host_.empty() && master_host_ == host && + master_port_ == port) { slaveof_mu_.unlock(); return Status::OK(); } + // Master is changed if (!master_host_.empty()) { if (replication_thread_) replication_thread_->Stop(); replication_thread_ = nullptr; @@ -1144,22 +1149,37 @@ std::string Server::GetClientsStr() { return clients; } -void Server::KillClient(int64_t *killed, std::string addr, uint64_t id, bool skipme, Redis::Connection *conn) { +void Server::KillClient(int64_t *killed, std::string addr, uint64_t id, + uint64_t type, bool skipme, Redis::Connection *conn) { *killed = 0; + + // Normal clients and pubsub clients for (const auto &t : worker_threads_) { int64_t killed_in_worker = 0; - t->GetWorker()->KillClient(conn, id, addr, skipme, &killed_in_worker); + t->GetWorker()->KillClient(conn, id, addr, type, skipme, &killed_in_worker); *killed += killed_in_worker; } + + // Slave clients slave_threads_mu_.lock(); for (const auto &st : slave_threads_) { - if ((!addr.empty() && st->GetConn()->GetAddr() == addr) - || (id != 0 && st->GetConn()->GetID() == id)) { + if ((type & kTypeSlave) || + (!addr.empty() && st->GetConn()->GetAddr() == addr) || + (id != 0 && st->GetConn()->GetID() == id)) { st->Stop(); (*killed)++; } } slave_threads_mu_.unlock(); + + // Master client + if (IsSlave() && + (type & kTypeMaster || + (!addr.empty() && addr == master_host_+":"+std::to_string(master_port_)))) { + // Stop replication thread and start a new one to replicate + AddMaster(master_host_, master_port_, true); + (*killed)++; + } } void Server::SetReplicationRateLimit(uint64_t max_replication_mb) { diff --git a/src/server.h b/src/server.h index 2addaa00563..8a93d404731 100644 --- a/src/server.h +++ b/src/server.h @@ -41,6 +41,13 @@ enum SlowLog { kSlowLogMaxString = 128, }; +enum ClientType { + kTypeNormal = (1ULL<<0), // normal client + kTypePubsub = (1ULL<<1), // pubsub client + kTypeMaster = (1ULL<<2), // master client + kTypeSlave = (1ULL<<3), // slave client +}; + class Server { public: explicit Server(Engine::Storage *storage, Config *config); @@ -55,7 +62,7 @@ class Server { Status LookupAndCreateCommand(const std::string &cmd_name, std::unique_ptr *cmd); - Status AddMaster(std::string host, uint32_t port); + Status AddMaster(std::string host, uint32_t port, bool force_reconnect); Status RemoveMaster(); Status AddSlave(Redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq); void DisconnectSlaves(); @@ -110,7 +117,8 @@ class Server { int DecrMonitorClientNum(); std::string GetClientsStr(); std::atomic *GetClientID(); - void KillClient(int64_t *killed, std::string addr, uint64_t id, bool skipme, Redis::Connection *conn); + void KillClient(int64_t *killed, std::string addr, uint64_t id, uint64_t type, + bool skipme, Redis::Connection *conn); void SetReplicationRateLimit(uint64_t max_replication_mb); LogCollector *GetPerfLog() { return &perf_log_; } diff --git a/src/worker.cc b/src/worker.cc index e1813f83d79..b0ed7ac0593 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -309,12 +309,15 @@ std::string Worker::GetClientsStr() { return output; } -void Worker::KillClient(Redis::Connection *self, uint64_t id, std::string addr, bool skipme, int64_t *killed) { +void Worker::KillClient(Redis::Connection *self, uint64_t id, std::string addr, + uint64_t type, bool skipme, int64_t *killed) { conns_mu_.lock(); for (const auto &iter : conns_) { Redis::Connection* conn = iter.second; if (skipme && self == conn) continue; - if ((!addr.empty() && conn->GetAddr() == addr) || (id != 0 && conn->GetID() == id)) { + if ((type & conn->GetClientType()) || + (!addr.empty() && conn->GetAddr() == addr) || + (id != 0 && conn->GetID() == id)) { conn->EnableFlag(Redis::Connection::kCloseAfterReply); // enable write event to notify worker wake up ASAP, and remove the connection if (!conn->IsFlagEnabled(Redis::Connection::kSlave)) { // don't enable any event in slave connection diff --git a/src/worker.h b/src/worker.h index 243e2427f4e..c7412928aba 100644 --- a/src/worker.h +++ b/src/worker.h @@ -38,7 +38,8 @@ class Worker { void FeedMonitorConns(Redis::Connection *conn, const std::vector &tokens); std::string GetClientsStr(); - void KillClient(Redis::Connection *self, uint64_t id, std::string addr, bool skipme, int64_t *killed); + void KillClient(Redis::Connection *self, uint64_t id, std::string addr, + uint64_t type, bool skipme, int64_t *killed); void KickoutIdleClients(int timeout); Server *svr_; diff --git a/tests/tcl/tests/unit/introspection.tcl b/tests/tcl/tests/unit/introspection.tcl index 145cbc49ab9..433a00aef16 100644 --- a/tests/tcl/tests/unit/introspection.tcl +++ b/tests/tcl/tests/unit/introspection.tcl @@ -49,6 +49,99 @@ start_server {tags {"introspection"}} { } } + test {Kill normal client} { + set rd [redis_deferring_client] + $rd client setname normal + assert_equal [$rd read] "OK" + assert_match {*normal*} [r client list] + + assert_equal {1} [r client kill skipme yes type normal] + assert_equal {1} [r client kill skipme no type normal] + reconnect + # Now the client should no longer be listed + wait_for_condition 50 100 { + [string match {*normal*} [r client list]] == 0 + } else { + fail "Killed client still listed in CLIENT LIST after killing." + } + } + + test {Kill pubsub client} { + # subscribe clients + set rd [redis_deferring_client] + $rd client setname pubsub + assert_equal [$rd read] "OK" + $rd subscribe foo + assert_equal [$rd read] {subscribe foo 1} + assert_match {*pubsub*} [r client list] + + # psubscribe clients + set rd1 [redis_deferring_client] + $rd1 client setname pubsub_patterns + assert_equal [$rd1 read] "OK" + $rd1 psubscribe bar.* + assert_equal [$rd1 read] {psubscribe bar.* 1} + + # normal clients + set rd2 [redis_deferring_client] + $rd2 client setname normal + assert_equal [$rd2 read] "OK" + + assert_equal {2} [r client kill type pubsub] + # Now the pubsub client should no longer be listed + # but normal client should not be dropped + wait_for_condition 50 100 { + [string match {*pubsub*} [r client list]] == 0 && + [string match {*normal*} [r client list]] == 1 + } else { + fail "Killed client still listed in CLIENT LIST after killing." + } + } + + start_server {} { + r slaveof [srv -1 host] [srv -1 port] + wait_for_condition 500 100 { + [string match {*connected*} [r role]] + } else { + fail "Slaves can't sync with master" + } + + test {Kill slave client} { + set partial_ok [s -1 sync_partial_ok] + # Kill slave connection + assert_equal {1} [r -1 client kill type slave] + # Incr sync_partial_ok since slave reconnects + wait_for_condition 50 100 { + [expr $partial_ok+1] eq [s -1 sync_partial_ok] + } else { + fail "Slave should reconnect after disconnecting " + } + } + + test {Kill master client} { + set partial_ok [s -1 sync_partial_ok] + # Kill master connection by type + assert_equal {1} [r client kill type master] + # Incr sync_partial_ok since slave reconnects + wait_for_condition 50 100 { + [expr $partial_ok+1] eq [s -1 sync_partial_ok] + } else { + fail "Slave should reconnect after disconnecting " + } + + set partial_ok [s -1 sync_partial_ok] + # Kill master connection by addr + set masteraddr [srv -1 host]:[srv -1 port] + assert_equal {OK} [r client kill $masteraddr] + # Incr sync_partial_ok since slave reconnects + wait_for_condition 50 100 { + [expr $partial_ok+1] eq [s -1 sync_partial_ok] + } else { + fail "Slave should reconnect after disconnecting " + } + } + } + test {DEBUG will freeze server} { set rd [redis_deferring_client] $rd DEBUG sleep 2.2 diff --git a/tests/tcl/tests/unit/multi.tcl b/tests/tcl/tests/unit/multi.tcl index 0f4909f8279..741d3dec088 100644 --- a/tests/tcl/tests/unit/multi.tcl +++ b/tests/tcl/tests/unit/multi.tcl @@ -78,4 +78,26 @@ start_server {tags {"multi"}} { assert_match {EXECABORT*} $e r ping } {PONG} + + test {MULTI-EXEC used in redis-sentinel for failover} { + start_server {} { + r multi + r slaveof [srv -1 host] [srv -1 port] + r config rewrite + r client kill type normal + r client kill type pubsub + r exec + reconnect + assert_equal "slave" [s role] + + r multi + r slaveof no one + r config rewrite + r client kill type normal + r client kill type pubsub + r exec + reconnect + assert_equal "master" [s role] + } + } }