Skip to content

Commit

Permalink
Fix many typos in cluster.cc (#919)
Browse files Browse the repository at this point in the history
  • Loading branch information
git-hulk authored Sep 26, 2022
1 parent 6d8e8f5 commit 9b6cc11
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 45 deletions.
82 changes: 44 additions & 38 deletions src/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
#include "redis_cmd.h"
#include "replication.h"

const char *errInvalidNodeID = "Invalid cluster node id";
const char *errInvalidSlotID = "Invalid slot id";
const char *errSlotOutOfRange = "Slot is out of range";
const char *errInvalidClusterVersion = "Invalid cluster version";
const char *errSlotOverlapped = "Slot distribution is overlapped";
const char *errNoMasterNode = "The node isn't a master";
const char *errClusterNoInitialized = "CLUSTERDOWN The cluster is not initialized";
const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info";
const char *errInvalidImportState = "Invalid import state";

ClusterNode::ClusterNode(std::string id, std::string host, int port,
int role, std::string master_id, std::bitset<kClusterSlots> slots):
id_(id), host_(host), port_(port), role_(role),
Expand All @@ -42,7 +52,7 @@ Cluster::Cluster(Server *svr, std::vector<std::string> binds, int port) :
}

// We access cluster without lock, actually we guarantee data-safe by work threads
// ReadWriteLockGuard, CLUSTER command doesn't have 'execlusive' attribute, i.e.
// ReadWriteLockGuard, CLUSTER command doesn't have 'exclusive' attribute, i.e.
// CLUSTER command can be executed concurrently, but some subcommand may change
// cluster data, so these commands should be executed exclusively, and ReadWriteLock
// also can guarantee accessing data is safe.
Expand All @@ -61,7 +71,7 @@ bool Cluster::SubCommandIsExecExclusive(const std::string &subcommand) {

Status Cluster::SetNodeId(std::string node_id) {
if (node_id.size() != kClusterNodeIdLen) {
return Status(Status::ClusterInvalidInfo, "Invalid node id");
return Status(Status::ClusterInvalidInfo, errInvalidNodeID);
}

myid_ = node_id;
Expand Down Expand Up @@ -92,13 +102,13 @@ Status Cluster::SetNodeId(std::string node_id) {
Status Cluster::SetSlot(int slot, std::string node_id, int64_t new_version) {
// Parameters check
if (new_version <= 0 || new_version != version_ + 1) {
return Status(Status::NotOK, "Invalid cluster version");
return Status(Status::NotOK, errInvalidClusterVersion);
}
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Invalid slot id");
return Status(Status::NotOK, errInvalidSlotID);
}
if (node_id.size() != kClusterNodeIdLen) {
return Status(Status::NotOK, "Invalid node id");
return Status(Status::NotOK, errInvalidNodeID);
}

// Get the node which we want to assign a slot into it
Expand All @@ -107,7 +117,7 @@ Status Cluster::SetSlot(int slot, std::string node_id, int64_t new_version) {
return Status(Status::NotOK, "No this node in the cluster");
}
if (to_assign_node->role_ != kClusterMaster) {
return Status(Status::NotOK, "The node is not the master");
return Status(Status::NotOK, errNoMasterNode);
}

// Update version
Expand Down Expand Up @@ -143,12 +153,12 @@ Status Cluster::SetSlot(int slot, std::string node_id, int64_t new_version) {
// cluster setnodes $all_nodes_info $version $force
// one line of $all_nodes: $node_id $host $port $role $master_node_id $slot_range
Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, bool force) {
if (version < 0) return Status(Status::NotOK, "Invalid version");
if (version < 0) return Status(Status::NotOK, errInvalidClusterVersion);

if (force == false) {
// Low version wants to reset current version
if (version_ > version) {
return Status(Status::NotOK, "Invalid version of cluster");
return Status(Status::NotOK, errInvalidClusterVersion);
}
// The same version, it is not needed to update
if (version_ == version) return Status::OK();
Expand Down Expand Up @@ -245,7 +255,7 @@ bool Cluster::IsNotMaster() {

Status Cluster::SetSlotMigrated(int slot, const std::string &ip_port) {
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
return Status(Status::NotOK, errSlotOutOfRange);
}
// It is called by slot-migrating thread which is an asynchronous thread.
// Therefore, it should be locked when a record is added to 'migrated_slots_'
Expand All @@ -257,7 +267,7 @@ Status Cluster::SetSlotMigrated(int slot, const std::string &ip_port) {

Status Cluster::SetSlotImported(int slot) {
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
return Status(Status::NotOK, errSlotOutOfRange);
}
// It is called by command 'cluster import'. When executing the command, the
// exclusive lock has been locked. Therefore, it can't be locked again.
Expand All @@ -270,7 +280,7 @@ Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id) {
return Status(Status::NotOK, "Can't find the destination node id");
}
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
return Status(Status::NotOK, errSlotOutOfRange);
}
if (slots_nodes_[slot] != myself_) {
return Status(Status::NotOK, "Can't migrate slot which doesn't belong to me");
Expand Down Expand Up @@ -299,7 +309,7 @@ Status Cluster::ImportSlot(Redis::Connection *conn, int slot, int state) {
return Status(Status::NotOK, "Slave can't import slot");
}
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
return Status(Status::NotOK, errSlotOutOfRange);
}

switch (state) {
Expand All @@ -312,14 +322,14 @@ Status Cluster::ImportSlot(Redis::Connection *conn, int slot, int state) {
myself_->importing_slot_ = slot;
// Set link error callback
conn->close_cb_ = std::bind(&SlotImport::StopForLinkError, svr_->slot_import_, conn->GetFD());
// Stop forbiding writing slot to accept write commands
// Stop forbidding writing slot to accept write commands
if (slot == svr_->slot_migrate_->GetForbiddenSlot()) svr_->slot_migrate_->ReleaseForbiddenSlot();
LOG(INFO) << "[import] Start importing slot " << slot;
break;
case kImportSuccess:
if (!svr_->slot_import_->Success(slot)) {
LOG(ERROR) << "[import] Failed to set slot importing success, maybe slot is wrong"
<< ", recieved slot: " << slot
<< ", received slot: " << slot
<< ", current slot: " << svr_->slot_import_->GetSlot();
return Status(Status::NotOK, "Failed to set slot " + std::to_string(slot) + " importing success");
}
Expand All @@ -328,23 +338,21 @@ Status Cluster::ImportSlot(Redis::Connection *conn, int slot, int state) {
case kImportFailed:
if (!svr_->slot_import_->Fail(slot)) {
LOG(ERROR) << "[import] Failed to set slot importing error, maybe slot is wrong"
<< ", recieved slot: " << slot
<< ", received slot: " << slot
<< ", current slot: " << svr_->slot_import_->GetSlot();
return Status(Status::NotOK, "Failed to set slot " + std::to_string(slot) + " importing error");
}
LOG(INFO) << "[import] Failed to import slot " << slot;
break;
default:
return Status(Status::NotOK, "Invalid import state");
break;
return Status(Status::NotOK, errInvalidImportState);
}
return Status::OK();
}

Status Cluster::GetClusterInfo(std::string *cluster_infos) {
if (version_ < 0) {
return Status(Status::ClusterDown,
"CLUSTERDOWN The cluster is not initialized");
return Status(Status::ClusterDown, errClusterNoInitialized);
}
cluster_infos->clear();

Expand Down Expand Up @@ -390,8 +398,7 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) {
// ... continued until done
Status Cluster::GetSlotsInfo(std::vector<SlotInfo> *slots_infos) {
if (version_ < 0) {
return Status(Status::ClusterDown,
"CLUSTERDOWN The cluster is not initialized");
return Status(Status::ClusterDown, errClusterNoInitialized);
}
slots_infos->clear();

Expand All @@ -418,7 +425,7 @@ Status Cluster::GetSlotsInfo(std::vector<SlotInfo> *slots_infos) {

SlotInfo Cluster::GenSlotNodeInfo(int start, int end, std::shared_ptr<ClusterNode> n) {
std::vector<SlotInfo::NodeInfo> vn;
vn.push_back({n->host_, n->port_, n->id_}); // Itslef
vn.push_back({n->host_, n->port_, n->id_}); // itself

for (const auto &id : n->replicas) { // replicas
if (nodes_.find(id) == nodes_.end()) continue;
Expand All @@ -427,12 +434,11 @@ SlotInfo Cluster::GenSlotNodeInfo(int start, int end, std::shared_ptr<ClusterNod
return {start, end, vn};
}

// $node $host:$port@$cport $role $master_id/$- $ping_sent $ping_recieved
// $node $host:$port@$cport $role $master_id/$- $ping_sent $ping_received
// $version $connected $slot_range
Status Cluster::GetClusterNodes(std::string *nodes_str) {
if (version_ < 0) {
return Status(Status::ClusterDown,
"CLUSTERDOWN The cluster is not initialized");
return Status(Status::ClusterDown, errClusterNoInitialized);
}

*nodes_str = GenNodesDescription();
Expand Down Expand Up @@ -503,20 +509,20 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
std::unordered_map<int, std::string> *slots_nodes) {
std::vector<std::string> nodes_info = Util::Split(nodes_str, "\n");
if (nodes_info.size() == 0) {
return Status(Status::ClusterInvalidInfo, "Invalid cluster nodes info");
return Status(Status::ClusterInvalidInfo, errInvalidClusterNodeInfo);
}
nodes->clear();

// Parse all nodes
for (const auto &node_str : nodes_info) {
std::vector<std::string> fields = Util::Split(node_str, " ");
if (fields.size() < 5) {
return Status(Status::ClusterInvalidInfo, "Invalid cluster nodes info");
return Status(Status::ClusterInvalidInfo, errInvalidClusterNodeInfo);
}

// 1) node id
if (fields[0].size() != kClusterNodeIdLen) {
return Status(Status::ClusterInvalidInfo, "Invalid cluster node id");
return Status(Status::ClusterInvalidInfo, errInvalidNodeID);
}
std::string id = fields[0];

Expand All @@ -526,7 +532,7 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
// 3) port
int port = std::atoi(fields[2].c_str());
if (port <= 0 || port >= (65535-kClusterPortIncr)) {
return Status(Status::ClusterInvalidInfo, "Invalid cluste node port");
return Status(Status::ClusterInvalidInfo, "Invalid cluster node port");
}

// 4) role
Expand All @@ -537,20 +543,20 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
strcasecmp(fields[3].c_str(), "replica") == 0) {
role = kClusterSlave;
} else {
return Status(Status::ClusterInvalidInfo, "Invalid cluste node role");
return Status(Status::ClusterInvalidInfo, "Invalid cluster node role");
}

// 5) master id
std::string master_id = fields[4];
if ((role == kClusterMaster && master_id != "-") ||
(role == kClusterSlave && master_id.size() != kClusterNodeIdLen)) {
return Status(Status::ClusterInvalidInfo, "Invalid cluste node master id");
return Status(Status::ClusterInvalidInfo, errInvalidNodeID);
}

std::bitset<kClusterSlots> slots;
if (role == kClusterSlave) {
if (fields.size() != 5) {
return Status(Status::ClusterInvalidInfo, "Invalid cluster nodes info");
return Status(Status::ClusterInvalidInfo, errInvalidClusterNodeInfo);
} else {
// Create slave node
(*nodes)[id] = Util::MakeShared<ClusterNode>(
Expand All @@ -566,12 +572,12 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
if (ranges.size() == 1) {
start = std::atoi(ranges[0].c_str());
if (IsValidSlot(start) == false) {
return Status(Status::ClusterInvalidInfo, "Invalid cluste slot range");
return Status(Status::ClusterInvalidInfo, errSlotOutOfRange);
}
slots.set(start, 1);
if (role == kClusterMaster) {
if (slots_nodes->find(start) != slots_nodes->end()) {
return Status(Status::ClusterInvalidInfo, "Slot distribution is overlapped");
return Status(Status::ClusterInvalidInfo, errSlotOverlapped);
} else {
(*slots_nodes)[start] = id;
}
Expand All @@ -580,20 +586,20 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
start = std::atoi(ranges[0].c_str());
stop = std::atoi(ranges[1].c_str());
if (start >= stop || start < 0 || stop >= kClusterSlots) {
return Status(Status::ClusterInvalidInfo, "Invalid cluste slot range");
return Status(Status::ClusterInvalidInfo, errSlotOutOfRange);
}
for (int j = start; j <= stop; j++) {
slots.set(j, 1);
if (role == kClusterMaster) {
if (slots_nodes->find(j) != slots_nodes->end()) {
return Status(Status::ClusterInvalidInfo, "Slot distribution is overlapped");
return Status(Status::ClusterInvalidInfo, errSlotOverlapped);
} else {
(*slots_nodes)[j] = id;
}
}
}
} else {
return Status(Status::ClusterInvalidInfo, "Invalid cluste slot range");
return Status(Status::ClusterInvalidInfo, errSlotOutOfRange);
}
}

Expand Down Expand Up @@ -627,7 +633,7 @@ Status Cluster::CanExecByMySelf(const Redis::CommandAttributes *attributes,
if (slot == -1) slot = cur_slot;
if (slot != cur_slot) {
return Status(Status::RedisExecErr,
"CROSSSLOT Keys in request don't hash to the same slot");
"CROSSSLOT Attempted to access keys that don't hash to the same slot");
}
}
if (slot == -1) return Status::OK();
Expand Down
10 changes: 5 additions & 5 deletions tests/cppunit/cluster_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,35 +49,35 @@ TEST(Cluster, CluseterSetNodes) {
"master 07c37dfeb235213a872192d90877d0cd55635b91 5461-10922";
s = cluster.SetClusterNodes(invalid_port, 1, false);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid cluste node port");
ASSERT_TRUE(s.Msg() == "Invalid cluster node port");

const std::string slave_has_no_master =
"07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1 30004 "
"slave -";
s = cluster.SetClusterNodes(slave_has_no_master, 1, false);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid cluste node master id");
ASSERT_TRUE(s.Msg() == "Invalid cluster node id");

const std::string master_has_master =
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
"master 07c37dfeb235213a872192d90877d0cd55635b91 5461-10922";
s = cluster.SetClusterNodes(master_has_master, 1, false);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid cluste node master id");
ASSERT_TRUE(s.Msg() == "Invalid cluster node id");

const std::string invalid_slot_range =
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
"master - 5461-0";
s = cluster.SetClusterNodes(invalid_slot_range, 1, false);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid cluste slot range");
ASSERT_TRUE(s.Msg() == "Slot is out of range");

const std::string invalid_slot_id =
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
"master - 54610";
s = cluster.SetClusterNodes(invalid_slot_id, 1, false);
ASSERT_FALSE(s.IsOK());
ASSERT_TRUE(s.Msg() == "Invalid cluste slot range");
ASSERT_TRUE(s.Msg() == "Slot is out of range");

const std::string overlapped_slot_id =
"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1 30002 "
Expand Down
2 changes: 1 addition & 1 deletion tests/cppunit/rwlock_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ TEST(ReadWriteLock, WriteLockGurad_First) {
});
} else {
ths[i] = std::thread([&rwlock, &val]() {
usleep(1000); // To avoid it is the first to run, just sleep 100ms
usleep(100000); // To avoid it is the first to run, just sleep 100ms
auto ptr = std::unique_ptr<RWLock::ReadLock>(new RWLock::ReadLock(rwlock));
ASSERT_EQ(val, 3);
});
Expand Down
2 changes: 1 addition & 1 deletion tests/tcl/tests/integration/cluster.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ start_server {tags {"cluster"} overrides {cluster-enabled yes}} {
assert_match "*CLUSTER*" $err

catch {[r clusterx setnodes a -1]} err
assert_match "*Invalid version*" $err
assert_match "*Invalid cluster version*" $err

catch {[r clusterx setslot 16384 07c37dfeb235213a872192d90877d0cd55635b91 1]} err
assert_match "*CLUSTER*" $err
Expand Down

0 comments on commit 9b6cc11

Please sign in to comment.