diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 9c4e4986e6c..6525e044547 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -26,6 +26,7 @@ #include #include +#include "cluster/cluster_defs.h" #include "commands/commander.h" #include "common/io_util.h" #include "fmt/format.h" @@ -35,16 +36,6 @@ #include "string_util.h" #include "time_util.h" -const char *errInvalidNodeID = "Invalid cluster node id"; -const char *errInvalidSlotID = "Invalid slot id"; -const char *errSlotOutOfRange = "Slot is out of range"; -const char *errInvalidClusterVersion = "Invalid cluster version"; -const char *errSlotOverlapped = "Slot distribution is overlapped"; -const char *errNoMasterNode = "The node isn't a master"; -const char *errClusterNoInitialized = "CLUSTERDOWN The cluster is not initialized"; -const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info"; -const char *errInvalidImportState = "Invalid import state"; - ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, std::string master_id, std::bitset slots) : id(std::move(id)), host(std::move(host)), port(port), role(role), master_id(std::move(master_id)), slots(slots) {} @@ -93,10 +84,6 @@ Status Cluster::SetNodeId(const std::string &node_id) { return Status::OK(); } -// Set the slot to the node if new version is current version +1. It is useful -// when we scale cluster avoid too many big messages, since we only update one -// slot distribution and there are 16384 slot in our design. -// // The reason why the new version MUST be +1 of current version is that, // the command changes topology based on specific topology (also means specific // version), we must guarantee current topology is exactly expected, otherwise, @@ -104,21 +91,17 @@ Status Cluster::SetNodeId(const std::string &node_id) { // This is different with CLUSTERX SETNODES commands because it uses new version // topology to cover current version, it allows kvrocks nodes lost some topology // updates since of network failure, it is state instead of operation. -Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_version) { - // Parameters check +Status Cluster::SetSlotRanges(const std::vector &slot_ranges, const std::string &node_id, + int64_t new_version) { if (new_version <= 0 || new_version != version_ + 1) { return {Status::NotOK, errInvalidClusterVersion}; } - if (!IsValidSlot(slot)) { - return {Status::NotOK, errInvalidSlotID}; - } - if (node_id.size() != kClusterNodeIdLen) { return {Status::NotOK, errInvalidNodeID}; } - // Get the node which we want to assign a slot into it + // Get the node which we want to assign slots into it std::shared_ptr to_assign_node = nodes_[node_id]; if (to_assign_node == nullptr) { return {Status::NotOK, "No this node in the cluster"}; @@ -135,23 +118,29 @@ Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_versio // 1. Remove the slot from old node if existing // 2. Add the slot into to-assign node // 3. Update the map of slots to nodes. - std::shared_ptr old_node = slots_nodes_[slot]; - if (old_node != nullptr) { - old_node->slots[slot] = false; - } - to_assign_node->slots[slot] = true; - slots_nodes_[slot] = to_assign_node; - - // Clear data of migrated slot or record of imported slot - if (old_node == myself_ && old_node != to_assign_node) { - // If slot is migrated from this node - if (migrated_slots_.count(slot) > 0) { - svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot); - migrated_slots_.erase(slot); - } - // If slot is imported into this node - if (imported_slots_.count(slot) > 0) { - imported_slots_.erase(slot); + // remember: The atomicity of the process is based on + // the transactionality of ClearKeysOfSlot(). + for (auto [s_start, s_end] : slot_ranges) { + for (int slot = s_start; slot <= s_end; slot++) { + std::shared_ptr old_node = slots_nodes_[slot]; + if (old_node != nullptr) { + old_node->slots[slot] = false; + } + to_assign_node->slots[slot] = true; + slots_nodes_[slot] = to_assign_node; + + // Clear data of migrated slot or record of imported slot + if (old_node == myself_ && old_node != to_assign_node) { + // If slot is migrated from this node + if (migrated_slots_.count(slot) > 0) { + svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot); + migrated_slots_.erase(slot); + } + // If slot is imported into this node + if (imported_slots_.count(slot) > 0) { + imported_slots_.erase(slot); + } + } } } diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index f6d044359f0..9c89b1755d2 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -29,20 +29,13 @@ #include #include +#include "cluster/cluster_defs.h" #include "commands/commander.h" #include "common/io_util.h" #include "redis_slot.h" #include "server/redis_connection.h" #include "status.h" -enum { - kClusterMaster = 1, - kClusterSlave = 2, - kClusterNodeIdLen = 40, - kClusterPortIncr = 10000, - kClusterSlots = HASH_SLOTS_SIZE, -}; - class ClusterNode { public: explicit ClusterNode(std::string id, std::string host, int port, int role, std::string master_id, @@ -79,7 +72,7 @@ class Cluster { Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool force); Status GetClusterNodes(std::string *nodes_str); Status SetNodeId(const std::string &node_id); - Status SetSlot(int slot, const std::string &node_id, int64_t version); + Status SetSlotRanges(const std::vector &slot_ranges, const std::string &node_id, int64_t version); Status SetSlotMigrated(int slot, const std::string &ip_port); Status SetSlotImported(int slot); Status GetSlotsInfo(std::vector *slot_infos); diff --git a/src/cluster/cluster_defs.h b/src/cluster/cluster_defs.h new file mode 100644 index 00000000000..6638db8807e --- /dev/null +++ b/src/cluster/cluster_defs.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include "cluster/redis_slot.h" + +enum { + kClusterMaster = 1, + kClusterSlave = 2, + kClusterNodeIdLen = 40, + kClusterPortIncr = 10000, + kClusterSlots = HASH_SLOTS_SIZE, +}; + +inline constexpr const char *errInvalidNodeID = "Invalid cluster node id"; +inline constexpr const char *errInvalidSlotID = "Invalid slot id"; +inline constexpr const char *errSlotOutOfRange = "Slot is out of range"; +inline constexpr const char *errInvalidClusterVersion = "Invalid cluster version"; +inline constexpr const char *errSlotOverlapped = "Slot distribution is overlapped"; +inline constexpr const char *errNoMasterNode = "The node isn't a master"; +inline constexpr const char *errClusterNoInitialized = "CLUSTERDOWN The cluster is not initialized"; +inline constexpr const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info"; +inline constexpr const char *errInvalidImportState = "Invalid import state"; + +using SlotRange = std::pair; diff --git a/src/cluster/redis_slot.h b/src/cluster/redis_slot.h index 63c98d2e37e..297fd7c3704 100644 --- a/src/cluster/redis_slot.h +++ b/src/cluster/redis_slot.h @@ -19,6 +19,7 @@ */ #pragma once +#include #include // crc16 diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index 34ad8af8cf4..37f943ae22a 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -18,6 +18,7 @@ * */ +#include "cluster/cluster_defs.h" #include "cluster/slot_import.h" #include "commander.h" #include "error_constants.h" @@ -156,15 +157,9 @@ class CommandClusterX : public Commander { // CLUSTERX SETSLOT $SLOT_ID NODE $NODE_ID $VERSION if (subcommand_ == "setslot" && args_.size() == 6) { - auto parse_id = ParseInt(args[2], 10); - if (!parse_id) { - return {Status::RedisParseErr, errValueNotInteger}; - } - - slot_id_ = *parse_id; - - if (!Cluster::IsValidSlot(slot_id_)) { - return {Status::RedisParseErr, "Invalid slot id"}; + Status s = CommanderHelper::ParseSlotRanges(args_[2], slot_ranges_); + if (!s.IsOK()) { + return s; } if (strcasecmp(args_[3].c_str(), "node") != 0) { @@ -219,7 +214,7 @@ class CommandClusterX : public Commander { *output = redis::Error(s.Msg()); } } else if (subcommand_ == "setslot") { - Status s = svr->cluster->SetSlot(slot_id_, args_[4], set_version_); + Status s = svr->cluster->SetSlotRanges(slot_ranges_, args_[4], set_version_); if (s.IsOK()) { need_persist_nodes_info = true; *output = redis::SimpleString("OK"); @@ -251,7 +246,7 @@ class CommandClusterX : public Commander { std::string dst_node_id_; int64_t set_version_ = 0; int64_t slot_ = -1; - int slot_id_ = -1; + std::vector slot_ranges_; bool force_ = false; }; diff --git a/src/commands/commander.cc b/src/commands/commander.cc index e47e56f61d1..9c08a386047 100644 --- a/src/commands/commander.cc +++ b/src/commands/commander.cc @@ -20,6 +20,8 @@ #include "commander.h" +#include "cluster/cluster_defs.h" + namespace redis { RegisterToCommandTable::RegisterToCommandTable(std::initializer_list list) { @@ -108,4 +110,52 @@ bool IsCommandExists(const std::string &name) { return command_details::original_commands.find(util::ToLower(name)) != command_details::original_commands.end(); } +Status CommanderHelper::ParseSlotRanges(const std::string &slots_str, std::vector &slots) { + if (slots_str.empty()) { + return {Status::NotOK, "No slots to parse."}; + } + + std::vector slot_ranges = util::Split(slots_str, " "); + if (slot_ranges.empty()) { + return {Status::NotOK, + fmt::format("Invalid slots: `{}`. No slots to parse. Please use spaces to separate slots.", slots_str)}; + } + + auto valid_range = NumericRange{0, kClusterSlots - 1}; + // Parse all slots (include slot ranges) + for (auto &slot_range : slot_ranges) { + if (slot_range.find('-') == std::string::npos) { + auto parse_result = ParseInt(slot_range, valid_range, 10); + if (!parse_result) { + return std::move(parse_result).Prefixed(errInvalidSlotID); + } + slots.emplace_back(*parse_result, *parse_result); + continue; + } + + // parse slot range: "int1-int2" (satisfy: int1 <= int2 ) + if (slot_range.front() == '-' || slot_range.back() == '-') { + return {Status::NotOK, + fmt::format("Invalid slot range: `{}`. The character '-' can't appear in the first or last position.", + slot_range)}; + } + std::vector fields = util::Split(slot_range, "-"); + if (fields.size() != 2) { + return {Status::NotOK, + fmt::format("Invalid slot range: `{}`. The slot range should be of the form `int1-int2`.", slot_range)}; + } + auto parse_start = ParseInt(fields[0], valid_range, 10); + auto parse_end = ParseInt(fields[1], valid_range, 10); + if (!parse_start || !parse_end || *parse_start > *parse_end) { + return {Status::NotOK, + fmt::format( + "Invalid slot range: `{}`. The slot range `int1-int2` needs to satisfy the condition (int1 <= int2).", + slot_range)}; + } + slots.emplace_back(*parse_start, *parse_end); + } + + return Status::OK(); +} + } // namespace redis diff --git a/src/commands/commander.h b/src/commands/commander.h index 7433def39c7..a2cda41be22 100644 --- a/src/commands/commander.h +++ b/src/commands/commander.h @@ -37,6 +37,8 @@ #include #include +#include "cluster/cluster_defs.h" +#include "parse_util.h" #include "server/redis_reply.h" #include "status.h" #include "string_util.h" @@ -87,6 +89,11 @@ class CommanderWithParseMove : Commander { virtual Status ParseMove(std::vector &&args) { return Status::OK(); } }; +class CommanderHelper { + public: + static Status ParseSlotRanges(const std::string &slots_str, std::vector &slots); +}; + using CommanderFactory = std::function()>; struct CommandKeyRange { diff --git a/src/common/encoding.h b/src/common/encoding.h index 27fe82eb2e5..ea1c48bffac 100644 --- a/src/common/encoding.h +++ b/src/common/encoding.h @@ -23,6 +23,7 @@ #include #include +#include #include enum class Endian { diff --git a/tests/cppunit/cluster_test.cc b/tests/cppunit/cluster_test.cc index 3f214271492..bdbf20f2db0 100644 --- a/tests/cppunit/cluster_test.cc +++ b/tests/cppunit/cluster_test.cc @@ -20,11 +20,15 @@ #include "cluster/cluster.h" +#include #include #include #include +#include +#include "cluster/cluster_defs.h" +#include "commands/commander.h" #include "server/server.h" TEST(Cluster, CluseterSetNodes) { @@ -195,3 +199,129 @@ TEST(Cluster, TestDumpAndLoadClusterNodesInfo) { unlink(nodes_filename.c_str()); } + +TEST(Cluster, ClusterParseSlotRanges) { + Status s; + Cluster cluster(nullptr, {"127.0.0.1"}, 3002); + const std::string node_id = "67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1"; + int64_t version = 1; + + const std::string right_nodes = node_id + + " 127.0.0.1 30002 " + "master - 0 123-456 789 831 8192-16381 16382 16383"; + s = cluster.SetClusterNodes(right_nodes, version, false); + ASSERT_TRUE(s.IsOK()); + ASSERT_TRUE(cluster.GetVersion() == version); + version++; + + std::vector slots; + + const std::string t_single_slot = "1234"; + s = redis::CommanderHelper::ParseSlotRanges(t_single_slot, slots); + ASSERT_TRUE(s.IsOK()); + s = cluster.SetSlotRanges(slots, node_id, version); + ASSERT_TRUE(s.IsOK()); + version++; + slots.clear(); + + const std::string t_single_ranges = "1234-1236"; + s = redis::CommanderHelper::ParseSlotRanges(t_single_ranges, slots); + ASSERT_TRUE(s.IsOK()); + s = cluster.SetSlotRanges(slots, node_id, version); + ASSERT_TRUE(s.IsOK()); + version++; + slots.clear(); + + const std::string t_mixed_slot = "10229 16301 4710 3557-8559 "; + s = redis::CommanderHelper::ParseSlotRanges(t_mixed_slot, slots); + ASSERT_TRUE(s.IsOK()); + s = cluster.SetSlotRanges(slots, node_id, version); + ASSERT_TRUE(s.IsOK()); + version++; + slots.clear(); + + std::string empty_slots; + s = redis::CommanderHelper::ParseSlotRanges(empty_slots, slots); + ASSERT_FALSE(s.IsOK()); + ASSERT_TRUE(s.Msg() == "No slots to parse."); + slots.clear(); + + std::string space_slots = " "; + s = redis::CommanderHelper::ParseSlotRanges(space_slots, slots); + ASSERT_FALSE(s.IsOK()); + ASSERT_TRUE(s.Msg() == fmt::format("Invalid slots: `{}`. No slots to parse. " + "Please use spaces to separate slots.", + space_slots)); + + std::vector error_slots; + std::string invalid_single_slot = "830849ad"; + std::string unbound_single_slot = "1683093429"; + std::string front_slot_ranges = "-1234-3456"; + std::string back_slot_ranges = "1234-3456-"; + std::string f_single_slot = "-6351"; + std::string overmuch_slot_ranges = "12-34-56"; + std::string f_cond_slot_ranges = "3456-1234"; + + error_slots.emplace_back(invalid_single_slot); + error_slots.emplace_back(unbound_single_slot); + error_slots.emplace_back(front_slot_ranges); + error_slots.emplace_back(back_slot_ranges); + error_slots.emplace_back(f_single_slot); + error_slots.emplace_back(overmuch_slot_ranges); + error_slots.emplace_back(f_cond_slot_ranges); + + slots.clear(); + for (int i = 0; i < 2; i++) { + if (i == 1) { + for (auto &slot_str : error_slots) { + slot_str = t_mixed_slot + slot_str; + } + } + + s = redis::CommanderHelper::ParseSlotRanges(error_slots[0], slots); + ASSERT_FALSE(s.IsOK()); + ASSERT_TRUE(s.Msg() == "Invalid slot id: encounter non-integer characters"); + slots.clear(); + + s = redis::CommanderHelper::ParseSlotRanges(error_slots[1], slots); + ASSERT_FALSE(s.IsOK()); + ASSERT_TRUE(s.Msg() == "Invalid slot id: out of numeric range"); + slots.clear(); + + s = redis::CommanderHelper::ParseSlotRanges(error_slots[2], slots); + ASSERT_FALSE(s.IsOK()); + ASSERT_TRUE(s.Msg() == fmt::format("Invalid slot range: `{}`. The character '-' can't appear " + "in the first or last position.", + front_slot_ranges)); + slots.clear(); + + s = redis::CommanderHelper::ParseSlotRanges(error_slots[3], slots); + ASSERT_FALSE(s.IsOK()); + ASSERT_TRUE(s.Msg() == + fmt::format("Invalid slot range: `{}`. The character '-' can't appear in the first or last position.", + back_slot_ranges)); + slots.clear(); + + s = redis::CommanderHelper::ParseSlotRanges(error_slots[4], slots); + ASSERT_FALSE(s.IsOK()); + ASSERT_TRUE(s.Msg() == + fmt::format("Invalid slot range: `{}`. The character '-' can't appear in the first or last position.", + f_single_slot)); + slots.clear(); + + s = redis::CommanderHelper::ParseSlotRanges(error_slots[5], slots); + ASSERT_FALSE(s.IsOK()); + ASSERT_TRUE(s.Msg() == fmt::format("Invalid slot range: `{}`. The slot range should be of the form `int1-int2`.", + overmuch_slot_ranges)); + slots.clear(); + + s = redis::CommanderHelper::ParseSlotRanges(error_slots[6], slots); + ASSERT_FALSE(s.IsOK()); + ASSERT_TRUE( + s.Msg() == + fmt::format( + "Invalid slot range: `{}`. The slot range `int1-int2` needs to satisfy the condition (int1 <= int2).", + f_cond_slot_ranges)); + slots.clear(); + } +} diff --git a/tests/gocase/integration/cluster/cluster_test.go b/tests/gocase/integration/cluster/cluster_test.go index 438bb9a56eb..257b74b31b2 100644 --- a/tests/gocase/integration/cluster/cluster_test.go +++ b/tests/gocase/integration/cluster/cluster_test.go @@ -255,21 +255,21 @@ func TestClusterSlotSet(t *testing.T) { require.NoError(t, rdb2.Set(ctx, slotKey, 0, 0).Err()) util.ErrorRegexp(t, rdb1.Set(ctx, slotKey, 0, 0).Err(), fmt.Sprintf(".*MOVED 0.*%d.*", srv2.Port())) - require.NoError(t, rdb2.Do(ctx, "clusterx", "setslot", "1", "node", nodeID2, "4").Err()) - require.NoError(t, rdb1.Do(ctx, "clusterx", "setslot", "1", "node", nodeID2, "4").Err()) + require.NoError(t, rdb2.Do(ctx, "clusterx", "setslot", "1-3 4", "node", nodeID2, "4").Err()) + require.NoError(t, rdb1.Do(ctx, "clusterx", "setslot", "1-3 4", "node", nodeID2, "4").Err()) slots = rdb2.ClusterSlots(ctx).Val() require.EqualValues(t, slots, rdb1.ClusterSlots(ctx).Val()) require.Len(t, slots, 2) require.EqualValues(t, 0, slots[0].Start) - require.EqualValues(t, 1, slots[0].End) + require.EqualValues(t, 4, slots[0].End) require.EqualValues(t, []redis.ClusterNode{{ID: nodeID2, Addr: srv2.HostPort()}}, slots[0].Nodes) - require.EqualValues(t, 2, slots[1].Start) + require.EqualValues(t, 5, slots[1].Start) require.EqualValues(t, 16383, slots[1].End) require.EqualValues(t, []redis.ClusterNode{{ID: nodeID1, Addr: srv1.HostPort()}}, slots[1].Nodes) // wrong version can't update slot distribution - require.ErrorContains(t, rdb2.Do(ctx, "clusterx", "setslot", "2", "node", nodeID2, "6").Err(), "version") - require.ErrorContains(t, rdb2.Do(ctx, "clusterx", "setslot", "2", "node", nodeID2, "4").Err(), "version") + require.ErrorContains(t, rdb2.Do(ctx, "clusterx", "setslot", "4", "node", nodeID2, "6").Err(), "version") + require.ErrorContains(t, rdb2.Do(ctx, "clusterx", "setslot", "4", "node", nodeID2, "4").Err(), "version") require.EqualValues(t, "4", rdb2.Do(ctx, "clusterx", "version").Val()) require.EqualValues(t, "4", rdb1.Do(ctx, "clusterx", "version").Val()) }