Skip to content

Commit

Permalink
Support CLUSTERX SETSLOT command (apache#463)
Browse files Browse the repository at this point in the history
CLUSTERX SETSLOT $slot_id NODE $node_id $new_version

- Description
assign the slot to the node if new version is current version+1.

- Parameters
$slot_id: the slot which we want to assign
$node_id: the node which we want to assign the slot into
NODE: keep the same with redis CLUSTER SETSLOT command, maybe we want to expand
this command in the future
$new_version: the new version MUST be +1 of current version so that kvrocks could
execute this command, this is to guarantee this change is based on a special
cluster topology

- Purpose
users can use this command to set slot distribution
slot migration needs this command to update slot distribution
New version constraint
The reason why the new version MUST be +1 of current version is that,
the command changes topology based on specific topology (also means specific
version), we must guarantee current topology is exactly expected, otherwise
this update may make topology corrupt, so base topology version is very important.
This is different with CLUSTERX SETNODES commands because it uses new version
topology to cover current version, it allows kvrocks nodes lost some topology
updates since of network failure, it is state instead of operation.
  • Loading branch information
ShooterIT committed Jan 27, 2022
1 parent 57aba16 commit 9747c3a
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 13 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ kvrocks
version.h
Makefile.dep
make_config.mk
.vscode
.vscode
kvrocks2redis
61 changes: 57 additions & 4 deletions src/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ bool Cluster::SubCommandIsExecExclusive(const std::string &subcommand) {
return true;
} else if (strcasecmp("setnodeid", subcommand.c_str()) == 0) {
return true;
} else if (strcasecmp("setslot", subcommand.c_str()) == 0) {
return true;
}
return false;
}

Status Cluster::SetNodeId(std::string node_id) {
if (node_id.size() != kClusetNodeIdLen) {
if (node_id.size() != kClusterNodeIdLen) {
return Status(Status::ClusterInvalidInfo, "Invalid node id");
}

Expand All @@ -53,6 +55,55 @@ Status Cluster::SetNodeId(std::string node_id) {
return Status::OK();
}

// Set the slot to the node if new version is current version +1. It is useful
// when we scale cluster avoid too many big messages, since we only update one
// slot distribution and there are 16384 slot in our design.
//
// The reason why the new version MUST be +1 of current version is that,
// the command changes topology based on specific topology (also means specific
// version), we must guarantee current topology is exactly expected, otherwise,
// this update may make topology corrupt, so base topology version is very important.
// This is different with CLUSTERX SETNODES commands because it uses new version
// topology to cover current version, it allows kvrocks nodes lost some topology
// updates since of network failure, it is state instead of operation.
Status Cluster::SetSlot(int slot, std::string node_id, int64_t new_version) {
// Parameters check
if (new_version <= 0 || new_version != version_ + 1) {
return Status(Status::NotOK, "Invalid cluster version");
}
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Invalid slot id");
}
if (node_id.size() != kClusterNodeIdLen) {
return Status(Status::NotOK, "Invalid node id");
}

// Get the node which we want to assign a slot into it
std::shared_ptr<ClusterNode> to_assign_node = nodes_[node_id];
if (to_assign_node == nullptr) {
return Status(Status::NotOK, "No this node in the cluster");
}
if (to_assign_node->role_ != kClusterMaster) {
return Status(Status::NotOK, "The node is not the master");
}

// Update version
version_ = new_version;

// Update topology
// 1. Remove the slot from old node if existing
// 2. Add the slot into to-assign node
// 3. Update the map of slots to nodes.
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
if (old_node != nullptr) {
old_node->slots_[slot] = 0;
}
to_assign_node->slots_[slot] = 1;
slots_nodes_[slot] = to_assign_node;

return Status::OK();
}

// cluster setnodes $all_nodes_info $version $force
// one line of $all_nodes: $node_id $host $port $role $master_node_id $slot_range
Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, bool force) {
Expand Down Expand Up @@ -276,7 +327,9 @@ std::string Cluster::GenNodesDescription() {

// Slots
if (n->slots_info_.size() > 0) n->slots_info_.pop_back(); // Trim space
if (n->role_ == kClusterMaster) node_str.append(" " + n->slots_info_);
if (n->role_ == kClusterMaster && n->slots_info_.size() > 0) {
node_str.append(" " + n->slots_info_);
}
n->slots_info_.clear(); // Reset

nodes_desc.append(node_str + "\n");
Expand All @@ -302,7 +355,7 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
}

// 1) node id
if (fields[0].size() != kClusetNodeIdLen) {
if (fields[0].size() != kClusterNodeIdLen) {
return Status(Status::ClusterInvalidInfo, "Invalid cluster node id");
}
std::string id = fields[0];
Expand Down Expand Up @@ -330,7 +383,7 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
// 5) master id
std::string master_id = fields[4];
if ((role == kClusterMaster && master_id != "-") ||
(role == kClusterSlave && master_id.size() != kClusetNodeIdLen)) {
(role == kClusterSlave && master_id.size() != kClusterNodeIdLen)) {
return Status(Status::ClusterInvalidInfo, "Invalid cluste node master id");
}

Expand Down
13 changes: 7 additions & 6 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
#include "redis_slot.h"

enum {
kClusterMaster = 1,
kClusterSlave = 2,
kClusetNodeIdLen = 40,
kClusterPortIncr = 10000,
kClusterSlots = HASH_SLOTS_SIZE,
kClusterMaster = 1,
kClusterSlave = 2,
kClusterNodeIdLen = 40,
kClusterPortIncr = 10000,
kClusterSlots = HASH_SLOTS_SIZE,
};

class ClusterNode {
Expand Down Expand Up @@ -55,10 +55,11 @@ class Cluster {
Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool force);
Status GetClusterNodes(std::string *nodes_str);
Status SetNodeId(std::string node_id);
Status SetSlot(int slot, std::string node_id, int64_t version);
Status GetSlotsInfo(std::vector<SlotInfo> *slot_infos);
Status GetClusterInfo(std::string *cluster_infos);
uint64_t GetVersion() { return version_; }
bool IsValidSlot(int slot) { return slot >= 0 && slot < kClusterSlots; }
static bool IsValidSlot(int slot) { return slot >= 0 && slot < kClusterSlots; }
Status CanExecByMySelf(const Redis::CommandAttributes *attributes,
const std::vector<std::string> &cmd_tokens);
void SetMasterSlaveRepl();
Expand Down
31 changes: 29 additions & 2 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4316,7 +4316,9 @@ class CommandClusterX : public Commander {

if (args.size() == 2 && (subcommand_ == "version")) return Status::OK();
if (subcommand_ == "setnodeid" && args_.size() == 3 &&
args_[2].size() == kClusetNodeIdLen) return Status::OK();
args_[2].size() == kClusterNodeIdLen) return Status::OK();

// CLUSTERX SETNODES $ALL_NODES_INFO $VERSION FORCE
if (subcommand_ == "setnodes" && args_.size() >= 4) {
nodes_str_ = args_[2];
set_version_ = atoll(args_[3].c_str());
Expand All @@ -4328,8 +4330,25 @@ class CommandClusterX : public Commander {
}
return Status(Status::RedisParseErr, "Invalid setnodes options");
}

// CLUSTERX SETSLOT $SLOT_ID NODE $NODE_ID $VERSION
if (subcommand_ == "setslot" && args_.size() == 6) {
slot_id_ = atoi(args_[2].c_str());
if (!Cluster::IsValidSlot(slot_id_)) {
return Status(Status::RedisParseErr, "Invalid slot id");
}
if (strcasecmp(args_[3].c_str(), "node") != 0) {
return Status(Status::RedisParseErr, "Invalid setslot options");
}
if (args_[4].size() != kClusterNodeIdLen) {
return Status(Status::RedisParseErr, "Invalid node id");
}
set_version_ = atoll(args_[5].c_str());
if (set_version_ < 0) return Status(Status::RedisParseErr, "Invalid version");
return Status::OK();
}
return Status(Status::RedisParseErr,
"CLUSTERX command, CLUSTERX VERSION|SETNODEID|SETNODES");
"CLUSTERX command, CLUSTERX VERSION|SETNODEID|SETNODES|SETSLOT");
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -4357,6 +4376,13 @@ class CommandClusterX : public Commander {
} else {
*output = Redis::Error(s.Msg());
}
} else if (subcommand_ == "setslot") {
Status s = svr->cluster_->SetSlot(slot_id_, args_[4], set_version_);
if (s.IsOK()) {
*output = Redis::SimpleString("OK");
} else {
*output = Redis::Error(s.Msg());
}
} else if (subcommand_ == "version") {
int64_t v = svr->cluster_->GetVersion();
*output = Redis::BulkString(std::to_string(v));
Expand All @@ -4370,6 +4396,7 @@ class CommandClusterX : public Commander {
std::string subcommand_;
std::string nodes_str_;
uint64_t set_version_ = 0;
int slot_id_ = -1;
bool force_ = false;
};

Expand Down
57 changes: 57 additions & 0 deletions tests/tcl/tests/integration/cluster.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,63 @@ start_server {tags {"cluster"} overrides {cluster-enabled yes}} {

catch {[r clusterx setnodes a -1]} err
assert_match "*Invalid version*" $err

catch {[r clusterx setslot 16384 07c37dfeb235213a872192d90877d0cd55635b91 1]} err
assert_match "*CLUSTER*" $err

catch {[r clusterx setslot 16383 a 1]} err
assert_match "*CLUSTER*" $err
}
}

start_server {tags {"cluster"} overrides {cluster-enabled yes}} {
set nodeid1 "07c37dfeb235213a872192d90877d0cd55635b91"
r clusterx SETNODEID $nodeid1
set port1 [srv port]

start_server {tags {"cluster"} overrides {cluster-enabled yes}} {
set nodeid2 "07c37dfeb235213a872192d90877d0cd55635b92"
r clusterx SETNODEID $nodeid2
set port2 [srv port]

test {cluster slotset command test} {
set nodes_str "$nodeid1 127.0.0.1 $port1 master - 0-16383"
set nodes_str "$nodes_str\n$nodeid2 127.0.0.1 $port2 master -"

r clusterx setnodes $nodes_str 2
r -1 clusterx setnodes $nodes_str 2

set slot_0_key "06S"
assert_equal {OK} [r -1 set $slot_0_key 0]
catch {[r set $slot_0_key 0]} err
assert_match "*MOVED 0*$port1*" $err

r clusterx setslot 0 node $nodeid2 3
r -1 clusterx setslot 0 node $nodeid2 3
assert_equal {3} [r clusterx version]
assert_equal {3} [r -1 clusterx version]
assert_equal [r cluster slots] [r -1 cluster slots]
assert_equal [r cluster slots] "{0 0 {127.0.0.1 $port2 $nodeid2}} {1 16383 {127.0.0.1 $port1 $nodeid1}}"

assert_equal {OK} [r set $slot_0_key 0]
catch {[r -1 set $slot_0_key 0]} err
assert_match "*MOVED 0*$port2*" $err

r clusterx setslot 1 node $nodeid2 4
r -1 clusterx setslot 1 node $nodeid2 4
assert_equal [r cluster slots] [r -1 cluster slots]
assert_equal [r cluster slots] "{0 1 {127.0.0.1 $port2 $nodeid2}} {2 16383 {127.0.0.1 $port1 $nodeid1}}"

# wrong version can't update slot distribution
catch {[r clusterx setslot 2 node $nodeid2 6]} err
assert_match "*version*" $err

catch {[r clusterx setslot 2 node $nodeid2 4]} err
assert_match "*version*" $err

assert_equal {4} [r clusterx version]
assert_equal {4} [r -1 clusterx version]
}
}
}

Expand Down

0 comments on commit 9747c3a

Please sign in to comment.