Skip to content

Commit

Permalink
Add handling LMOVE in BatchExtractor (#1364)
Browse files Browse the repository at this point in the history
  • Loading branch information
torwig authored Apr 3, 2023
1 parent f36842c commit 115f354
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 79 deletions.
26 changes: 17 additions & 9 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ Status Cluster::CanExecByMySelf(const Redis::CommandAttributes *attributes, cons
for (auto i : keys_indexes) {
if (i >= static_cast<int>(cmd_tokens.size())) break;

int cur_slot = GetSlotNumFromKey(cmd_tokens[i]);
int cur_slot = GetSlotIdFromKey(cmd_tokens[i]);
if (slot == -1) slot = cur_slot;
if (slot != cur_slot) {
return {Status::RedisExecErr, "CROSSSLOT Attempted to access keys that don't hash to the same slot"};
Expand All @@ -783,7 +783,9 @@ Status Cluster::CanExecByMySelf(const Redis::CommandAttributes *attributes, cons

if (slots_nodes_[slot] == nullptr) {
return {Status::ClusterDown, "CLUSTERDOWN Hash slot not served"};
} else if (myself_ && myself_ == slots_nodes_[slot]) {
}

if (myself_ && myself_ == slots_nodes_[slot]) {
// We use central controller to manage the topology of the cluster.
// Server can't change the topology directly, so we record the migrated slots
// to move the requests of the migrated slots to the destination node.
Expand All @@ -797,22 +799,28 @@ Status Cluster::CanExecByMySelf(const Redis::CommandAttributes *attributes, cons
}

return Status::OK(); // I'm serving this slot
} else if (myself_ && myself_->importing_slot_ == slot && conn->IsImporting()) {
}

if (myself_ && myself_->importing_slot_ == slot && conn->IsImporting()) {
// While data migrating, the topology of the destination node has not been changed.
// The destination node has to serve the requests from the migrating slot,
// although the slot is not belong to itself. Therefore, we record the importing slot
// and mark the importing connection to accept the importing data.
return Status::OK(); // I'm serving the importing connection
} else if (myself_ && imported_slots_.count(slot)) {
}

if (myself_ && imported_slots_.count(slot)) {
// After the slot is migrated, new requests of the migrated slot will be moved to
// the destination server. Before the central controller change the topology, the destination
// server should record the imported slots to accept new data of the imported slots.
return Status::OK(); // I'm serving the imported slot
} else if (myself_ && myself_->role_ == kClusterSlave && !attributes->is_write() &&
nodes_.find(myself_->master_id_) != nodes_.end() && nodes_[myself_->master_id_] == slots_nodes_[slot]) {
}

if (myself_ && myself_->role_ == kClusterSlave && !attributes->is_write() &&
nodes_.find(myself_->master_id_) != nodes_.end() && nodes_[myself_->master_id_] == slots_nodes_[slot]) {
return Status::OK(); // My master is serving this slot
} else {
return {Status::RedisExecErr,
fmt::format("MOVED {} {}:{}", slot, slots_nodes_[slot]->host_, slots_nodes_[slot]->port_)};
}

return {Status::RedisExecErr,
fmt::format("MOVED {} {}:{}", slot, slots_nodes_[slot]->host_, slots_nodes_[slot]->port_)};
}
2 changes: 1 addition & 1 deletion src/cluster/redis_slot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ uint16_t crc16(const char *buf, int len) {
return crc;
}

uint16_t GetSlotNumFromKey(const std::string &key) {
uint16_t GetSlotIdFromKey(const std::string &key) {
auto tag = GetTagFromKey(key);
if (tag.empty()) {
tag = key;
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/redis_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ constexpr const uint16_t HASH_SLOTS_SIZE = HASH_SLOTS_MASK + 1; // 16384
constexpr const uint16_t HASH_SLOTS_MAX_ITERATIONS = 50;

uint16_t crc16(const char *buf, int len);
uint16_t GetSlotNumFromKey(const std::string &key);
uint16_t GetSlotIdFromKey(const std::string &key);
std::string GetTagFromKey(const std::string &key);
2 changes: 1 addition & 1 deletion src/commands/cmd_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class CommandCluster : public Commander {
}

if (subcommand_ == "keyslot") {
auto slot_id = GetSlotNumFromKey(args_[2]);
auto slot_id = GetSlotIdFromKey(args_[2]);
*output = Redis::Integer(slot_id);
} else if (subcommand_ == "slots") {
std::vector<SlotInfo> infos;
Expand Down
Loading

0 comments on commit 115f354

Please sign in to comment.