Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support slot batch for CLUSTERX SETSLOT #1414

Merged
merged 2 commits into from
May 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 27 additions & 38 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <fstream>
#include <memory>

#include "cluster/cluster_defs.h"
#include "commands/commander.h"
#include "common/io_util.h"
#include "fmt/format.h"
Expand All @@ -35,16 +36,6 @@
#include "string_util.h"
#include "time_util.h"

const char *errInvalidNodeID = "Invalid cluster node id";
const char *errInvalidSlotID = "Invalid slot id";
const char *errSlotOutOfRange = "Slot is out of range";
const char *errInvalidClusterVersion = "Invalid cluster version";
const char *errSlotOverlapped = "Slot distribution is overlapped";
const char *errNoMasterNode = "The node isn't a master";
const char *errClusterNoInitialized = "CLUSTERDOWN The cluster is not initialized";
const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info";
const char *errInvalidImportState = "Invalid import state";

ClusterNode::ClusterNode(std::string id, std::string host, int port, int role, std::string master_id,
std::bitset<kClusterSlots> slots)
: id(std::move(id)), host(std::move(host)), port(port), role(role), master_id(std::move(master_id)), slots(slots) {}
Expand Down Expand Up @@ -93,32 +84,24 @@ Status Cluster::SetNodeId(const std::string &node_id) {
return Status::OK();
}

// Set the slot to the node if new version is current version +1. It is useful
// when we scale cluster avoid too many big messages, since we only update one
// slot distribution and there are 16384 slot in our design.
//
// The reason why the new version MUST be +1 of current version is that,
// the command changes topology based on specific topology (also means specific
// version), we must guarantee current topology is exactly expected, otherwise,
// this update may make topology corrupt, so base topology version is very important.
// This is different with CLUSTERX SETNODES commands because it uses new version
// topology to cover current version, it allows kvrocks nodes lost some topology
// updates since of network failure, it is state instead of operation.
Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_version) {
// Parameters check
Status Cluster::SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const std::string &node_id,
int64_t new_version) {
if (new_version <= 0 || new_version != version_ + 1) {
return {Status::NotOK, errInvalidClusterVersion};
}

if (!IsValidSlot(slot)) {
return {Status::NotOK, errInvalidSlotID};
}

if (node_id.size() != kClusterNodeIdLen) {
return {Status::NotOK, errInvalidNodeID};
}

// Get the node which we want to assign a slot into it
// Get the node which we want to assign slots into it
std::shared_ptr<ClusterNode> to_assign_node = nodes_[node_id];
if (to_assign_node == nullptr) {
return {Status::NotOK, "No this node in the cluster"};
Expand All @@ -135,23 +118,29 @@ Status Cluster::SetSlot(int slot, const std::string &node_id, int64_t new_versio
// 1. Remove the slot from old node if existing
// 2. Add the slot into to-assign node
// 3. Update the map of slots to nodes.
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
if (old_node != nullptr) {
old_node->slots[slot] = false;
}
to_assign_node->slots[slot] = true;
slots_nodes_[slot] = to_assign_node;

// Clear data of migrated slot or record of imported slot
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
migrated_slots_.erase(slot);
}
// If slot is imported into this node
if (imported_slots_.count(slot) > 0) {
imported_slots_.erase(slot);
// remember: The atomicity of the process is based on
// the transactionality of ClearKeysOfSlot().
for (auto [s_start, s_end] : slot_ranges) {
for (int slot = s_start; slot <= s_end; slot++) {
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
if (old_node != nullptr) {
old_node->slots[slot] = false;
}
to_assign_node->slots[slot] = true;
slots_nodes_[slot] = to_assign_node;

// Clear data of migrated slot or record of imported slot
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
svr_->slot_migrator->ClearKeysOfSlot(kDefaultNamespace, slot);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One ClearKeyOfSlot call one writebatch of rocksdb.
So this process exists consistency issues if slot in the middle is down.
I don't know what to do with it yet

Copy link
Contributor

@caipengbo caipengbo May 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is indeed this problem and we can persist it to Propagate CF.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After we merge this PR, you can ask an issue and I will solve this problem, which I have solved in production environment.

migrated_slots_.erase(slot);
}
// If slot is imported into this node
if (imported_slots_.count(slot) > 0) {
imported_slots_.erase(slot);
}
}
}
}

Expand Down
11 changes: 2 additions & 9 deletions src/cluster/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,13 @@
#include <unordered_map>
#include <vector>

#include "cluster/cluster_defs.h"
#include "commands/commander.h"
#include "common/io_util.h"
#include "redis_slot.h"
#include "server/redis_connection.h"
#include "status.h"

enum {
kClusterMaster = 1,
kClusterSlave = 2,
kClusterNodeIdLen = 40,
kClusterPortIncr = 10000,
kClusterSlots = HASH_SLOTS_SIZE,
};

class ClusterNode {
public:
explicit ClusterNode(std::string id, std::string host, int port, int role, std::string master_id,
Expand Down Expand Up @@ -79,7 +72,7 @@ class Cluster {
Status SetClusterNodes(const std::string &nodes_str, int64_t version, bool force);
Status GetClusterNodes(std::string *nodes_str);
Status SetNodeId(const std::string &node_id);
Status SetSlot(int slot, const std::string &node_id, int64_t version);
Status SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const std::string &node_id, int64_t version);
Status SetSlotMigrated(int slot, const std::string &ip_port);
Status SetSlotImported(int slot);
Status GetSlotsInfo(std::vector<SlotInfo> *slot_infos);
Expand Down
43 changes: 43 additions & 0 deletions src/cluster/cluster_defs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once

#include "cluster/redis_slot.h"

enum {
infdahai marked this conversation as resolved.
Show resolved Hide resolved
kClusterMaster = 1,
kClusterSlave = 2,
kClusterNodeIdLen = 40,
kClusterPortIncr = 10000,
kClusterSlots = HASH_SLOTS_SIZE,
};

inline constexpr const char *errInvalidNodeID = "Invalid cluster node id";
inline constexpr const char *errInvalidSlotID = "Invalid slot id";
inline constexpr const char *errSlotOutOfRange = "Slot is out of range";
inline constexpr const char *errInvalidClusterVersion = "Invalid cluster version";
inline constexpr const char *errSlotOverlapped = "Slot distribution is overlapped";
inline constexpr const char *errNoMasterNode = "The node isn't a master";
inline constexpr const char *errClusterNoInitialized = "CLUSTERDOWN The cluster is not initialized";
inline constexpr const char *errInvalidClusterNodeInfo = "Invalid cluster nodes info";
inline constexpr const char *errInvalidImportState = "Invalid import state";

using SlotRange = std::pair<int, int>;
1 change: 1 addition & 0 deletions src/cluster/redis_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/

#pragma once
#include <cstdint>
PragmaTwice marked this conversation as resolved.
Show resolved Hide resolved
#include <string>

// crc16
Expand Down
17 changes: 6 additions & 11 deletions src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*
*/

#include "cluster/cluster_defs.h"
#include "cluster/slot_import.h"
#include "commander.h"
#include "error_constants.h"
Expand Down Expand Up @@ -156,15 +157,9 @@ class CommandClusterX : public Commander {

// CLUSTERX SETSLOT $SLOT_ID NODE $NODE_ID $VERSION
if (subcommand_ == "setslot" && args_.size() == 6) {
auto parse_id = ParseInt<int>(args[2], 10);
if (!parse_id) {
return {Status::RedisParseErr, errValueNotInteger};
}

slot_id_ = *parse_id;

if (!Cluster::IsValidSlot(slot_id_)) {
return {Status::RedisParseErr, "Invalid slot id"};
Status s = CommanderHelper::ParseSlotRanges(args_[2], slot_ranges_);
if (!s.IsOK()) {
return s;
}

if (strcasecmp(args_[3].c_str(), "node") != 0) {
Expand Down Expand Up @@ -219,7 +214,7 @@ class CommandClusterX : public Commander {
*output = redis::Error(s.Msg());
}
} else if (subcommand_ == "setslot") {
Status s = svr->cluster->SetSlot(slot_id_, args_[4], set_version_);
Status s = svr->cluster->SetSlotRanges(slot_ranges_, args_[4], set_version_);
if (s.IsOK()) {
need_persist_nodes_info = true;
*output = redis::SimpleString("OK");
Expand Down Expand Up @@ -251,7 +246,7 @@ class CommandClusterX : public Commander {
std::string dst_node_id_;
int64_t set_version_ = 0;
int64_t slot_ = -1;
int slot_id_ = -1;
std::vector<SlotRange> slot_ranges_;
bool force_ = false;
};

Expand Down
50 changes: 50 additions & 0 deletions src/commands/commander.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include "commander.h"

#include "cluster/cluster_defs.h"

namespace redis {

RegisterToCommandTable::RegisterToCommandTable(std::initializer_list<CommandAttributes> list) {
Expand Down Expand Up @@ -108,4 +110,52 @@ bool IsCommandExists(const std::string &name) {
return command_details::original_commands.find(util::ToLower(name)) != command_details::original_commands.end();
}

Status CommanderHelper::ParseSlotRanges(const std::string &slots_str, std::vector<SlotRange> &slots) {
if (slots_str.empty()) {
return {Status::NotOK, "No slots to parse."};
}

std::vector<std::string> slot_ranges = util::Split(slots_str, " ");
if (slot_ranges.empty()) {
return {Status::NotOK,
fmt::format("Invalid slots: `{}`. No slots to parse. Please use spaces to separate slots.", slots_str)};
}

auto valid_range = NumericRange<int>{0, kClusterSlots - 1};
// Parse all slots (include slot ranges)
for (auto &slot_range : slot_ranges) {
if (slot_range.find('-') == std::string::npos) {
auto parse_result = ParseInt<int>(slot_range, valid_range, 10);
if (!parse_result) {
return std::move(parse_result).Prefixed(errInvalidSlotID);
}
slots.emplace_back(*parse_result, *parse_result);
continue;
}

// parse slot range: "int1-int2" (satisfy: int1 <= int2 )
if (slot_range.front() == '-' || slot_range.back() == '-') {
return {Status::NotOK,
fmt::format("Invalid slot range: `{}`. The character '-' can't appear in the first or last position.",
slot_range)};
}
std::vector<std::string> fields = util::Split(slot_range, "-");
if (fields.size() != 2) {
return {Status::NotOK,
fmt::format("Invalid slot range: `{}`. The slot range should be of the form `int1-int2`.", slot_range)};
}
auto parse_start = ParseInt<int>(fields[0], valid_range, 10);
auto parse_end = ParseInt<int>(fields[1], valid_range, 10);
if (!parse_start || !parse_end || *parse_start > *parse_end) {
return {Status::NotOK,
fmt::format(
"Invalid slot range: `{}`. The slot range `int1-int2` needs to satisfy the condition (int1 <= int2).",
slot_range)};
}
slots.emplace_back(*parse_start, *parse_end);
}

return Status::OK();
}

} // namespace redis
7 changes: 7 additions & 0 deletions src/commands/commander.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include <utility>
#include <vector>

#include "cluster/cluster_defs.h"
#include "parse_util.h"
#include "server/redis_reply.h"
#include "status.h"
#include "string_util.h"
Expand Down Expand Up @@ -87,6 +89,11 @@ class CommanderWithParseMove : Commander {
virtual Status ParseMove(std::vector<std::string> &&args) { return Status::OK(); }
};

class CommanderHelper {
public:
static Status ParseSlotRanges(const std::string &slots_str, std::vector<SlotRange> &slots);
};

using CommanderFactory = std::function<std::unique_ptr<Commander>()>;

struct CommandKeyRange {
Expand Down
1 change: 1 addition & 0 deletions src/common/encoding.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <rocksdb/slice.h>
#include <unistd.h>

#include <cstdint>
infdahai marked this conversation as resolved.
Show resolved Hide resolved
#include <string>

enum class Endian {
Expand Down
Loading