Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support client kill type subcommands #352

Merged
merged 2 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3365,7 +3365,7 @@ class CommandSlaveOf : public Commander {
LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')";
}
} else {
s = svr->AddMaster(host_, port_);
s = svr->AddMaster(host_, port_, false);
if (s.IsOK()) {
*output = Redis::SimpleString("OK");
LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_
Expand Down Expand Up @@ -3591,6 +3591,18 @@ class CommandClient : public Commander {
} else {
return Status(Status::RedisParseErr, errInvalidSyntax);
}
} else if (args[i] == "type" && moreargs) {
if (args[i+1] == "normal") {
kill_type_ |= kTypeNormal;
} else if (args[i+1] == "pubsub") {
kill_type_ |= kTypePubsub;
} else if (args[i+1] == "master") {
kill_type_ |= kTypeMaster;
} else if (args[i+1] == "replica" || args[i+1] == "slave") {
kill_type_ |= kTypeSlave;
} else {
return Status(Status::RedisParseErr, errInvalidSyntax);
}
} else {
return Status(Status::RedisParseErr, errInvalidSyntax);
}
Expand Down Expand Up @@ -3619,7 +3631,7 @@ class CommandClient : public Commander {
return Status::OK();
} else if (subcommand_ == "kill") {
int64_t killed = 0;
srv->KillClient(&killed, addr_, id_, skipme_, conn);
srv->KillClient(&killed, addr_, id_, kill_type_, skipme_, conn);
if (new_format_) {
*output = Redis::Integer(killed);
} else {
Expand All @@ -3640,6 +3652,7 @@ class CommandClient : public Commander {
std::string conn_name_;
std::string subcommand_;
bool skipme_ = false;
int64_t kill_type_ = 0;
uint64_t id_ = 0;
bool new_format_ = true;
};
Expand Down Expand Up @@ -4442,7 +4455,7 @@ CommandAttributes redisCommandTable[] = {
ADD_CMD("compact", 1, "read-only", 0, 0, 0, CommandCompact),
ADD_CMD("bgsave", 1, "read-only", 0, 0, 0, CommandBGSave),
ADD_CMD("flushbackup", 1, "read-only", 0, 0, 0, CommandFlushBackup),
ADD_CMD("slaveof", 3, "read-only", 0, 0, 0, CommandSlaveOf),
ADD_CMD("slaveof", 3, "read-only exclusive", 0, 0, 0, CommandSlaveOf),
ShooterIT marked this conversation as resolved.
Show resolved Hide resolved
ADD_CMD("stats", 1, "read-only", 0, 0, 0, CommandStats),

ADD_CMD("replconf", -3, "read-only replication", 0, 0, 0, CommandReplConf),
Expand Down
18 changes: 16 additions & 2 deletions src/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,26 @@ uint64_t Connection::GetIdleTime() {
return static_cast<uint64_t>(now-last_interaction_);
}

// Currently, master connection is not handled in connection
// but in replication thread.
//
// The function will return one of the following:
// kTypeSlave -> Slave
// kTypeNormal -> Normal client
// kTypePubsub -> Client subscribed to Pub/Sub channels
uint64_t Connection::GetClientType() {
if (IsFlagEnabled(kSlave)) return kTypeSlave;
if (!subscribe_channels_.empty() || !subcribe_patterns_.empty()) return kTypePubsub;
return kTypeNormal;
}

std::string Connection::GetFlags() {
std::string flags;
if (owner_->IsRepl()) flags.append("R");
if (IsFlagEnabled(kSlave)) flags.append("S");
if (IsFlagEnabled(kCloseAfterReply)) flags.append("c");
if (IsFlagEnabled(kMonitor)) flags.append("M");
if (!subscribe_channels_.empty()) flags.append("P");
if (!subscribe_channels_.empty() || !subcribe_patterns_.empty()) flags.append("P");
if (flags.empty()) flags = "N";
return flags;
}
Expand Down Expand Up @@ -284,7 +297,8 @@ void Connection::ExecuteCommands(const std::vector<Redis::CommandTokens> &to_pro
password = IsRepl() ? config->masterauth : config->requirepass;

for (auto &cmd_tokens : to_process_cmds) {
if (IsFlagEnabled(Redis::Connection::kCloseAfterReply)) break;
if (IsFlagEnabled(Redis::Connection::kCloseAfterReply) &&
!IsFlagEnabled(Connection::kMultiExec)) break;
if (GetNamespace().empty()) {
if (!password.empty() && Util::ToLower(cmd_tokens.front()) != "auth") {
Reply(Redis::Error("NOAUTH Authentication required."));
Expand Down
1 change: 1 addition & 0 deletions src/redis_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class Connection {
int GetPort() { return port_; }
void SetListeningPort(int port) { listening_port_ = port; }
int GetListeningPort() { return listening_port_; }
uint64_t GetClientType();

bool IsAdmin() { return is_admin_; }
void BecomeAdmin() { is_admin_ = true; }
Expand Down
34 changes: 27 additions & 7 deletions src/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Server::~Server() {
// - feed-replica-file: send SST files when slaves ask for full sync
Status Server::Start() {
if (!config_->master_host.empty()) {
Status s = AddMaster(config_->master_host, static_cast<uint32_t>(config_->master_port));
Status s = AddMaster(config_->master_host, static_cast<uint32_t>(config_->master_port), false);
if (!s.IsOK()) return s;
}
for (const auto worker : worker_threads_) {
Expand Down Expand Up @@ -139,13 +139,18 @@ void Server::Join() {
if (compaction_checker_thread_.joinable()) compaction_checker_thread_.join();
}

Status Server::AddMaster(std::string host, uint32_t port) {
Status Server::AddMaster(std::string host, uint32_t port, bool force_reconnect) {
slaveof_mu_.lock();
if (!master_host_.empty() && master_host_ == host && master_port_ == port) {

// Don't check host and port if 'force_reconnect' argument is set to true
if (!force_reconnect &&
!master_host_.empty() && master_host_ == host &&
master_port_ == port) {
slaveof_mu_.unlock();
return Status::OK();
}

// Master is changed
if (!master_host_.empty()) {
if (replication_thread_) replication_thread_->Stop();
replication_thread_ = nullptr;
Expand Down Expand Up @@ -1144,22 +1149,37 @@ std::string Server::GetClientsStr() {
return clients;
}

void Server::KillClient(int64_t *killed, std::string addr, uint64_t id, bool skipme, Redis::Connection *conn) {
void Server::KillClient(int64_t *killed, std::string addr, uint64_t id,
uint64_t type, bool skipme, Redis::Connection *conn) {
*killed = 0;

// Normal clients and pubsub clients
for (const auto &t : worker_threads_) {
int64_t killed_in_worker = 0;
t->GetWorker()->KillClient(conn, id, addr, skipme, &killed_in_worker);
t->GetWorker()->KillClient(conn, id, addr, type, skipme, &killed_in_worker);
*killed += killed_in_worker;
}

// Slave clients
slave_threads_mu_.lock();
for (const auto &st : slave_threads_) {
if ((!addr.empty() && st->GetConn()->GetAddr() == addr)
|| (id != 0 && st->GetConn()->GetID() == id)) {
if ((type & kTypeSlave) ||
(!addr.empty() && st->GetConn()->GetAddr() == addr) ||
(id != 0 && st->GetConn()->GetID() == id)) {
st->Stop();
(*killed)++;
}
}
slave_threads_mu_.unlock();

// Master client
if (IsSlave() &&
(type & kTypeMaster ||
(!addr.empty() && addr == master_host_+":"+std::to_string(master_port_)))) {
// Stop replication thread and start a new one to replicate
AddMaster(master_host_, master_port_, true);
(*killed)++;
}
}

void Server::SetReplicationRateLimit(uint64_t max_replication_mb) {
Expand Down
12 changes: 10 additions & 2 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ enum SlowLog {
kSlowLogMaxString = 128,
};

enum ClientType {
kTypeNormal = (1ULL<<0), // normal client
kTypePubsub = (1ULL<<1), // pubsub client
kTypeMaster = (1ULL<<2), // master client
kTypeSlave = (1ULL<<3), // slave client
};

class Server {
public:
explicit Server(Engine::Storage *storage, Config *config);
Expand All @@ -55,7 +62,7 @@ class Server {
Status LookupAndCreateCommand(const std::string &cmd_name, std::unique_ptr<Redis::Commander> *cmd);


Status AddMaster(std::string host, uint32_t port);
Status AddMaster(std::string host, uint32_t port, bool force_reconnect);
Status RemoveMaster();
Status AddSlave(Redis::Connection *conn, rocksdb::SequenceNumber next_repl_seq);
void DisconnectSlaves();
Expand Down Expand Up @@ -110,7 +117,8 @@ class Server {
int DecrMonitorClientNum();
std::string GetClientsStr();
std::atomic<uint64_t> *GetClientID();
void KillClient(int64_t *killed, std::string addr, uint64_t id, bool skipme, Redis::Connection *conn);
void KillClient(int64_t *killed, std::string addr, uint64_t id, uint64_t type,
bool skipme, Redis::Connection *conn);
void SetReplicationRateLimit(uint64_t max_replication_mb);

LogCollector<PerfEntry> *GetPerfLog() { return &perf_log_; }
Expand Down
7 changes: 5 additions & 2 deletions src/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,15 @@ std::string Worker::GetClientsStr() {
return output;
}

void Worker::KillClient(Redis::Connection *self, uint64_t id, std::string addr, bool skipme, int64_t *killed) {
void Worker::KillClient(Redis::Connection *self, uint64_t id, std::string addr,
uint64_t type, bool skipme, int64_t *killed) {
conns_mu_.lock();
for (const auto &iter : conns_) {
Redis::Connection* conn = iter.second;
if (skipme && self == conn) continue;
if ((!addr.empty() && conn->GetAddr() == addr) || (id != 0 && conn->GetID() == id)) {
if ((type & conn->GetClientType()) ||
(!addr.empty() && conn->GetAddr() == addr) ||
(id != 0 && conn->GetID() == id)) {
conn->EnableFlag(Redis::Connection::kCloseAfterReply);
// enable write event to notify worker wake up ASAP, and remove the connection
if (!conn->IsFlagEnabled(Redis::Connection::kSlave)) { // don't enable any event in slave connection
Expand Down
3 changes: 2 additions & 1 deletion src/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ class Worker {
void FeedMonitorConns(Redis::Connection *conn, const std::vector<std::string> &tokens);

std::string GetClientsStr();
void KillClient(Redis::Connection *self, uint64_t id, std::string addr, bool skipme, int64_t *killed);
void KillClient(Redis::Connection *self, uint64_t id, std::string addr,
uint64_t type, bool skipme, int64_t *killed);
void KickoutIdleClients(int timeout);

Server *svr_;
Expand Down
93 changes: 93 additions & 0 deletions tests/tcl/tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,99 @@ start_server {tags {"introspection"}} {
}
}

test {Kill normal client} {
set rd [redis_deferring_client]
$rd client setname normal
assert_equal [$rd read] "OK"
assert_match {*normal*} [r client list]

assert_equal {1} [r client kill skipme yes type normal]
assert_equal {1} [r client kill skipme no type normal]
reconnect
# Now the client should no longer be listed
wait_for_condition 50 100 {
[string match {*normal*} [r client list]] == 0
} else {
fail "Killed client still listed in CLIENT LIST after killing."
}
}

test {Kill pubsub client} {
# subscribe clients
set rd [redis_deferring_client]
$rd client setname pubsub
assert_equal [$rd read] "OK"
$rd subscribe foo
assert_equal [$rd read] {subscribe foo 1}
assert_match {*pubsub*} [r client list]

# psubscribe clients
set rd1 [redis_deferring_client]
$rd1 client setname pubsub_patterns
assert_equal [$rd1 read] "OK"
$rd1 psubscribe bar.*
assert_equal [$rd1 read] {psubscribe bar.* 1}

# normal clients
set rd2 [redis_deferring_client]
$rd2 client setname normal
assert_equal [$rd2 read] "OK"

assert_equal {2} [r client kill type pubsub]
# Now the pubsub client should no longer be listed
# but normal client should not be dropped
wait_for_condition 50 100 {
[string match {*pubsub*} [r client list]] == 0 &&
[string match {*normal*} [r client list]] == 1
} else {
fail "Killed client still listed in CLIENT LIST after killing."
}
}

start_server {} {
r slaveof [srv -1 host] [srv -1 port]
wait_for_condition 500 100 {
[string match {*connected*} [r role]]
} else {
fail "Slaves can't sync with master"
}

test {Kill slave client} {
set partial_ok [s -1 sync_partial_ok]
# Kill slave connection
assert_equal {1} [r -1 client kill type slave]
# Incr sync_partial_ok since slave reconnects
wait_for_condition 50 100 {
[expr $partial_ok+1] eq [s -1 sync_partial_ok]
} else {
fail "Slave should reconnect after disconnecting "
}
}

test {Kill master client} {
set partial_ok [s -1 sync_partial_ok]
# Kill master connection by type
assert_equal {1} [r client kill type master]
# Incr sync_partial_ok since slave reconnects
wait_for_condition 50 100 {
[expr $partial_ok+1] eq [s -1 sync_partial_ok]
} else {
fail "Slave should reconnect after disconnecting "
}

set partial_ok [s -1 sync_partial_ok]
# Kill master connection by addr
set masteraddr [srv -1 host]:[srv -1 port]
assert_equal {OK} [r client kill $masteraddr]
# Incr sync_partial_ok since slave reconnects
wait_for_condition 50 100 {
[expr $partial_ok+1] eq [s -1 sync_partial_ok]
} else {
fail "Slave should reconnect after disconnecting "
}
}
}

test {DEBUG will freeze server} {
set rd [redis_deferring_client]
$rd DEBUG sleep 2.2
Expand Down
22 changes: 22 additions & 0 deletions tests/tcl/tests/unit/multi.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,26 @@ start_server {tags {"multi"}} {
assert_match {EXECABORT*} $e
r ping
} {PONG}

test {MULTI-EXEC used in redis-sentinel for failover} {
start_server {} {
r multi
r slaveof [srv -1 host] [srv -1 port]
r config rewrite
r client kill type normal
r client kill type pubsub
r exec
reconnect
assert_equal "slave" [s role]
ShooterIT marked this conversation as resolved.
Show resolved Hide resolved

r multi
r slaveof no one
r config rewrite
r client kill type normal
r client kill type pubsub
r exec
reconnect
assert_equal "master" [s role]
}
}
}