Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix many typos in cluster.cc #919

Merged
merged 3 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 44 additions & 38 deletions src/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
#include "redis_cmd.h"
#include "replication.h"

const char *errInvalidNodeID = "Invalid cluster node id";
const char *errInvalidSlotID = "Invalid slot id";
const char *errSlotOutOfRange = "Slot is out of range";
const char *errInvalidClusterVersion = "Invalid cluster version";
const char *errSlotOverlapped = "Slot distribution is overlapped";
const char *errNoMasterNode = "The node isn't a master";
const char *errClusterNoInitialized = "CLUSTERDOWN The cluster is not initialized";
const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info";
const char *errInvalidImportState = "Invalid import state";

ClusterNode::ClusterNode(std::string id, std::string host, int port,
int role, std::string master_id, std::bitset<kClusterSlots> slots):
id_(id), host_(host), port_(port), role_(role),
Expand All @@ -42,7 +52,7 @@ Cluster::Cluster(Server *svr, std::vector<std::string> binds, int port) :
}

// We access cluster without lock, actually we guarantee data-safe by work threads
// ReadWriteLockGuard, CLUSTER command doesn't have 'execlusive' attribute, i.e.
// ReadWriteLockGuard, CLUSTER command doesn't have 'exclusive' attribute, i.e.
// CLUSTER command can be executed concurrently, but some subcommand may change
// cluster data, so these commands should be executed exclusively, and ReadWriteLock
// also can guarantee accessing data is safe.
Expand All @@ -61,7 +71,7 @@ bool Cluster::SubCommandIsExecExclusive(const std::string &subcommand) {

Status Cluster::SetNodeId(std::string node_id) {
if (node_id.size() != kClusterNodeIdLen) {
return Status(Status::ClusterInvalidInfo, "Invalid node id");
return Status(Status::ClusterInvalidInfo, errInvalidNodeID);
}

myid_ = node_id;
Expand Down Expand Up @@ -92,13 +102,13 @@ Status Cluster::SetNodeId(std::string node_id) {
Status Cluster::SetSlot(int slot, std::string node_id, int64_t new_version) {
// Parameters check
if (new_version <= 0 || new_version != version_ + 1) {
return Status(Status::NotOK, "Invalid cluster version");
return Status(Status::NotOK, errInvalidClusterVersion);
}
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Invalid slot id");
return Status(Status::NotOK, errInvalidSlotID);
}
if (node_id.size() != kClusterNodeIdLen) {
return Status(Status::NotOK, "Invalid node id");
return Status(Status::NotOK, errInvalidNodeID);
}

// Get the node which we want to assign a slot into it
Expand All @@ -107,7 +117,7 @@ Status Cluster::SetSlot(int slot, std::string node_id, int64_t new_version) {
return Status(Status::NotOK, "No this node in the cluster");
}
if (to_assign_node->role_ != kClusterMaster) {
return Status(Status::NotOK, "The node is not the master");
return Status(Status::NotOK, errNoMasterNode);
}

// Update version
Expand Down Expand Up @@ -143,12 +153,12 @@ Status Cluster::SetSlot(int slot, std::string node_id, int64_t new_version) {
// cluster setnodes $all_nodes_info $version $force
// one line of $all_nodes: $node_id $host $port $role $master_node_id $slot_range
Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, bool force) {
if (version < 0) return Status(Status::NotOK, "Invalid version");
if (version < 0) return Status(Status::NotOK, errInvalidClusterVersion);

if (force == false) {
// Low version wants to reset current version
if (version_ > version) {
return Status(Status::NotOK, "Invalid version of cluster");
return Status(Status::NotOK, errInvalidClusterVersion);
}
// The same version, it is not needed to update
if (version_ == version) return Status::OK();
Expand Down Expand Up @@ -245,7 +255,7 @@ bool Cluster::IsNotMaster() {

Status Cluster::SetSlotMigrated(int slot, const std::string &ip_port) {
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
return Status(Status::NotOK, errSlotOutOfRange);
}
// It is called by slot-migrating thread which is an asynchronous thread.
// Therefore, it should be locked when a record is added to 'migrated_slots_'
Expand All @@ -257,7 +267,7 @@ Status Cluster::SetSlotMigrated(int slot, const std::string &ip_port) {

Status Cluster::SetSlotImported(int slot) {
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
return Status(Status::NotOK, errSlotOutOfRange);
}
// It is called by command 'cluster import'. When executing the command, the
// exclusive lock has been locked. Therefore, it can't be locked again.
Expand All @@ -270,7 +280,7 @@ Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id) {
return Status(Status::NotOK, "Can't find the destination node id");
}
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
return Status(Status::NotOK, errSlotOutOfRange);
}
if (slots_nodes_[slot] != myself_) {
return Status(Status::NotOK, "Can't migrate slot which doesn't belong to me");
Expand Down Expand Up @@ -299,7 +309,7 @@ Status Cluster::ImportSlot(Redis::Connection *conn, int slot, int state) {
return Status(Status::NotOK, "Slave can't import slot");
}
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
return Status(Status::NotOK, errSlotOutOfRange);
}

switch (state) {
Expand All @@ -312,14 +322,14 @@ Status Cluster::ImportSlot(Redis::Connection *conn, int slot, int state) {
myself_->importing_slot_ = slot;
// Set link error callback
conn->close_cb_ = std::bind(&SlotImport::StopForLinkError, svr_->slot_import_, conn->GetFD());
// Stop forbiding writing slot to accept write commands
// Stop forbidding writing slot to accept write commands
if (slot == svr_->slot_migrate_->GetForbiddenSlot()) svr_->slot_migrate_->ReleaseForbiddenSlot();
LOG(INFO) << "[import] Start importing slot " << slot;
break;
case kImportSuccess:
if (!svr_->slot_import_->Success(slot)) {
LOG(ERROR) << "[import] Failed to set slot importing success, maybe slot is wrong"
<< ", recieved slot: " << slot
<< ", received slot: " << slot
<< ", current slot: " << svr_->slot_import_->GetSlot();
return Status(Status::NotOK, "Failed to set slot " + std::to_string(slot) + " importing success");
}
Expand All @@ -328,23 +338,21 @@ Status Cluster::ImportSlot(Redis::Connection *conn, int slot, int state) {
case kImportFailed:
if (!svr_->slot_import_->Fail(slot)) {
LOG(ERROR) << "[import] Failed to set slot importing error, maybe slot is wrong"
<< ", recieved slot: " << slot
<< ", received slot: " << slot
<< ", current slot: " << svr_->slot_import_->GetSlot();
return Status(Status::NotOK, "Failed to set slot " + std::to_string(slot) + " importing error");
}
LOG(INFO) << "[import] Failed to import slot " << slot;
break;
default:
return Status(Status::NotOK, "Invalid import state");
break;
return Status(Status::NotOK, errInvalidImportState);
}
return Status::OK();
}

Status Cluster::GetClusterInfo(std::string *cluster_infos) {
if (version_ < 0) {
return Status(Status::ClusterDown,
"CLUSTERDOWN The cluster is not initialized");
return Status(Status::ClusterDown, errClusterNoInitialized);
}
cluster_infos->clear();

Expand Down Expand Up @@ -390,8 +398,7 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) {
// ... continued until done
Status Cluster::GetSlotsInfo(std::vector<SlotInfo> *slots_infos) {
if (version_ < 0) {
return Status(Status::ClusterDown,
"CLUSTERDOWN The cluster is not initialized");
return Status(Status::ClusterDown, errClusterNoInitialized);
}
slots_infos->clear();

Expand All @@ -418,7 +425,7 @@ Status Cluster::GetSlotsInfo(std::vector<SlotInfo> *slots_infos) {

SlotInfo Cluster::GenSlotNodeInfo(int start, int end, std::shared_ptr<ClusterNode> n) {
std::vector<SlotInfo::NodeInfo> vn;
vn.push_back({n->host_, n->port_, n->id_}); // Itslef
vn.push_back({n->host_, n->port_, n->id_}); // itself

for (const auto &id : n->replicas) { // replicas
if (nodes_.find(id) == nodes_.end()) continue;
Expand All @@ -427,12 +434,11 @@ SlotInfo Cluster::GenSlotNodeInfo(int start, int end, std::shared_ptr<ClusterNod
return {start, end, vn};
}

// $node $host:$port@$cport $role $master_id/$- $ping_sent $ping_recieved
// $node $host:$port@$cport $role $master_id/$- $ping_sent $ping_received
// $version $connected $slot_range
Status Cluster::GetClusterNodes(std::string *nodes_str) {
if (version_ < 0) {
return Status(Status::ClusterDown,
"CLUSTERDOWN The cluster is not initialized");
return Status(Status::ClusterDown, errClusterNoInitialized);
}

*nodes_str = GenNodesDescription();
Expand Down Expand Up @@ -503,20 +509,20 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
std::unordered_map<int, std::string> *slots_nodes) {
std::vector<std::string> nodes_info = Util::Split(nodes_str, "\n");
if (nodes_info.size() == 0) {
return Status(Status::ClusterInvalidInfo, "Invalid cluster nodes info");
return Status(Status::ClusterInvalidInfo, errInvalidClusterNodeInfo);
}
nodes->clear();

// Parse all nodes
for (const auto &node_str : nodes_info) {
std::vector<std::string> fields = Util::Split(node_str, " ");
if (fields.size() < 5) {
return Status(Status::ClusterInvalidInfo, "Invalid cluster nodes info");
return Status(Status::ClusterInvalidInfo, errInvalidClusterNodeInfo);
}

// 1) node id
if (fields[0].size() != kClusterNodeIdLen) {
return Status(Status::ClusterInvalidInfo, "Invalid cluster node id");
return Status(Status::ClusterInvalidInfo, errInvalidNodeID);
}
std::string id = fields[0];

Expand All @@ -526,7 +532,7 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
// 3) port
int port = std::atoi(fields[2].c_str());
if (port <= 0 || port >= (65535-kClusterPortIncr)) {
return Status(Status::ClusterInvalidInfo, "Invalid cluste node port");
return Status(Status::ClusterInvalidInfo, "Invalid cluster node port");
}

// 4) role
Expand All @@ -537,20 +543,20 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
strcasecmp(fields[3].c_str(), "replica") == 0) {
role = kClusterSlave;
} else {
return Status(Status::ClusterInvalidInfo, "Invalid cluste node role");
return Status(Status::ClusterInvalidInfo, "Invalid cluster node role");
}

// 5) master id
std::string master_id = fields[4];
if ((role == kClusterMaster && master_id != "-") ||
(role == kClusterSlave && master_id.size() != kClusterNodeIdLen)) {
return Status(Status::ClusterInvalidInfo, "Invalid cluste node master id");
return Status(Status::ClusterInvalidInfo, errInvalidNodeID);
}

std::bitset<kClusterSlots> slots;
if (role == kClusterSlave) {
if (fields.size() != 5) {
return Status(Status::ClusterInvalidInfo, "Invalid cluster nodes info");
return Status(Status::ClusterInvalidInfo, errInvalidClusterNodeInfo);
} else {
// Create slave node
(*nodes)[id] = Util::MakeShared<ClusterNode>(
Expand All @@ -566,12 +572,12 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
if (ranges.size() == 1) {
start = std::atoi(ranges[0].c_str());
if (IsValidSlot(start) == false) {
return Status(Status::ClusterInvalidInfo, "Invalid cluste slot range");
return Status(Status::ClusterInvalidInfo, errSlotOutOfRange);
}
slots.set(start, 1);
if (role == kClusterMaster) {
if (slots_nodes->find(start) != slots_nodes->end()) {
return Status(Status::ClusterInvalidInfo, "Slot distribution is overlapped");
return Status(Status::ClusterInvalidInfo, errSlotOverlapped);
} else {
(*slots_nodes)[start] = id;
}
Expand All @@ -580,20 +586,20 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
start = std::atoi(ranges[0].c_str());
stop = std::atoi(ranges[1].c_str());
if (start >= stop || start < 0 || stop >= kClusterSlots) {
return Status(Status::ClusterInvalidInfo, "Invalid cluste slot range");
return Status(Status::ClusterInvalidInfo, errSlotOutOfRange);
}
for (int j = start; j <= stop; j++) {
slots.set(j, 1);
if (role == kClusterMaster) {
if (slots_nodes->find(j) != slots_nodes->end()) {
return Status(Status::ClusterInvalidInfo, "Slot distribution is overlapped");
return Status(Status::ClusterInvalidInfo, errSlotOverlapped);
} else {
(*slots_nodes)[j] = id;
}
}
}
} else {
return Status(Status::ClusterInvalidInfo, "Invalid cluste slot range");
return Status(Status::ClusterInvalidInfo, errSlotOutOfRange);
}
}

Expand Down Expand Up @@ -627,7 +633,7 @@ Status Cluster::CanExecByMySelf(const Redis::CommandAttributes *attributes,
if (slot == -1) slot = cur_slot;
if (slot != cur_slot) {
return Status(Status::RedisExecErr,
"CROSSSLOT Keys in request don't hash to the same slot");
"CROSSSLOT Attempted to access keys that don't hash to the same slot");
}
}
if (slot == -1) return Status::OK();
Expand Down
10 changes: 5 additions & 5 deletions tests/cppunit/cluster_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,35 +49,35 @@ TEST(Cluster, CluseterSetNodes) {
"master 07c37dfeb235213a872192d90877d0cd55635b91 5461-10922";
s = cluster.SetClusterNodes(invalid_port, 1, false);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid cluste node port");
ASSERT_TRUE(s.Msg() == "Invalid cluster node port");

const std::string slave_has_no_master =
"07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
"slave -";
s = cluster.SetClusterNodes(slave_has_no_master, 1, false);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid cluste node master id");
ASSERT_TRUE(s.Msg() == "Invalid cluster node id");

const std::string master_has_master =
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
"master 07c37dfeb235213a872192d90877d0cd55635b91 5461-10922";
s = cluster.SetClusterNodes(master_has_master, 1, false);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid cluste node master id");
ASSERT_TRUE(s.Msg() == "Invalid cluster node id");

const std::string invalid_slot_range =
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
"master - 5461-0";
s = cluster.SetClusterNodes(invalid_slot_range, 1, false);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid cluste slot range");
ASSERT_TRUE(s.Msg() == "Slot is out of range");

const std::string invalid_slot_id =
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
"master - 54610";
s = cluster.SetClusterNodes(invalid_slot_id, 1, false);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid cluste slot range");
ASSERT_TRUE(s.Msg() == "Slot is out of range");

const std::string overlapped_slot_id =
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
Expand Down
2 changes: 1 addition & 1 deletion tests/cppunit/rwlock_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ TEST(ReadWriteLock, WriteLockGurad_First) {
});
} else {
ths[i] = std::thread([&rwlock, &val]() {
usleep(1000); // To avoid it is the first to run, just sleep 100ms
usleep(100000); // To avoid it is the first to run, just sleep 100ms
git-hulk marked this conversation as resolved.
Show resolved Hide resolved
auto ptr = std::unique_ptr<RWLock::ReadLock>(new RWLock::ReadLock(rwlock));
ASSERT_EQ(val, 3);
});
Expand Down
2 changes: 1 addition & 1 deletion tests/tcl/tests/integration/cluster.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ start_server {tags {"cluster"} overrides {cluster-enabled yes}} {
assert_match "*CLUSTER*" $err

catch {[r clusterx setnodes a -1]} err
assert_match "*Invalid version*" $err
assert_match "*Invalid cluster version*" $err

catch {[r clusterx setslot 16384 07c37dfeb235213a872192d90877d0cd55635b91 1]} err
assert_match "*CLUSTER*" $err
Expand Down