Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support slot-based data migration #430

Merged
merged 20 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ FINAL_CXXFLAGS+= -I$(JEMALLOC_PATH)/include
FINAL_LIBS+= $(JEMALLOC)
endif

K2RDIR= ../tools/kvrocks2redis
SHARED_OBJS= cluster.o compact_filter.o config.o cron.o encoding.o event_listener.o lock_manager.o \
log_collector.o redis_bitmap.o redis_bitmap_string.o redis_cmd.o redis_connection.o redis_db.o \
redis_hash.o redis_list.o redis_metadata.o redis_slot.o redis_pubsub.o redis_reply.o \
Expand All @@ -67,7 +66,7 @@ UNITTEST_OBJS= $(SHARED_OBJS) ../tests/main.o ../tests/t_metadata_test.o ../test
../tests/task_runner_test.o ../tests/t_string_test.o ../tests/t_zset_test.o ../tests/t_geo_test.o \
../tests/t_sortedint_test.o

# K2RDIR= ../tools/kvrocks2redis
K2RDIR= ../tools/kvrocks2redis
KVROCKS2REDIS_OBJS= $(SHARED_OBJS) $(K2RDIR)/main.o $(K2RDIR)/config.o $(K2RDIR)/parser.o \
$(K2RDIR)/redis_writer.o $(K2RDIR)/sync.o $(K2RDIR)/writer.o

Expand Down
12 changes: 6 additions & 6 deletions src/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
command_args = {"LSET", user_key, (*args)[1], value.ToString()};
break;
case kRedisCmdLInsert:
if (firstSeen_) {
if (first_seen_) {
if (args->size() < 4) {
LOG(ERROR)
<< "Fail to parse write_batch in putcf cmd linsert : args error, should contain before pivot value";
return rocksdb::Status::OK();
}
command_args = {"LINSERT", user_key, (*args)[1] == "1" ? "before" : "after", (*args)[2], (*args)[3]};
firstSeen_ = false;
first_seen_ = false;
}
break;
case kRedisCmdLRem:
Expand Down Expand Up @@ -161,23 +161,23 @@ rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const S
RedisCommand cmd = static_cast<RedisCommand >(std::stoi((*args)[0]));
switch (cmd) {
case kRedisCmdLTrim:
if (firstSeen_) {
if (first_seen_) {
if (args->size() < 3) {
LOG(ERROR) << "Fail to parse write_batch in DeleteCF cmd ltrim : args error ,should contain start,stop";
return rocksdb::Status::OK();
}
command_args = {"LTRIM", user_key, (*args)[1], (*args)[2]};
firstSeen_ = false;
first_seen_ = false;
}
break;
case kRedisCmdLRem:
if (firstSeen_) {
if (first_seen_) {
if (args->size() < 3) {
LOG(ERROR) << "Fail to parse write_batch in DeleteCF cmd lrem : args error ,should contain count,value";
return rocksdb::Status::OK();
}
command_args = {"LREM", user_key, (*args)[1], (*args)[2]};
firstSeen_ = false;
first_seen_ = false;
}
break;
default:command_args = {cmd == kRedisCmdLPop ? "LPOP" : "RPOP", user_key};
Expand Down
4 changes: 2 additions & 2 deletions src/batch_extractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ class WriteBatchExtractor : public rocksdb::WriteBatch::Handler {
rocksdb::Status DeleteCF(uint32_t column_family_id, const Slice &key) override;
rocksdb::Status DeleteRangeCF(uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key) override;
std::map<std::string, std::vector<std::string>> *GetAofStrings() { return &resp_commands_; }
std::map<std::string, std::vector<std::string>> *GetRESPCommands() { return &resp_commands_; }
private:
std::map<std::string, std::vector<std::string>> resp_commands_;
Redis::WriteBatchLogData log_data_;
bool firstSeen_ = true;
bool first_seen_ = true;
bool is_slotid_encoded_ = false;
int slot_;
bool to_redis_;
Expand Down
97 changes: 43 additions & 54 deletions src/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Status Cluster::SetSlot(int slot, std::string node_id, int64_t new_version) {
slots_nodes_[slot] = to_assign_node;

// Clear data of migrated slot or record of imported slot
if (old_node != to_assign_node) {
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot)) {
svr_->slot_migrate_->ClearKeysOfSlot(kDefaultNamespace, slot);
Expand Down Expand Up @@ -226,6 +226,9 @@ Status Cluster::SetSlotMigrated(int slot, const std::string &ip_port) {
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
}
// It is called by slot-migrating thread which is an asynchronous thread.
// Therefore, it should be locked when a record is added to 'migrated_slots_'
// which will be accessed when executing commands.
auto exclusivity = svr_->WorkExclusivityGuard();
migrated_slots_[slot] = ip_port;
return Status::OK();
Expand All @@ -235,6 +238,8 @@ Status Cluster::SetSlotImported(int slot) {
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
}
// It is called by command 'cluster import'. When executing the command, the
// exclusive lock has been locked. Therefore, it can't be locked again.
imported_slots_.insert(slot);
return Status::OK();
}
Expand Down Expand Up @@ -315,57 +320,6 @@ Status Cluster::ImportSlot(Redis::Connection *conn, int slot, int state) {
return Status::OK();
}

Status Cluster::GetMigrateInfo(int slot, std::vector<std::string> *info) {
if (IsNotMaster()) {
return Status(Status::NotOK, "Slave can't migrate slot");
}
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
}
return svr_->slot_migrate_->GetMigrateInfo(info, slot);
}

Status Cluster::GetImportInfo(int slot, std::vector<std::string> *info) {
if (IsNotMaster()) {
return Status(Status::NotOK, "Slave can't import slot");
}
if (!IsValidSlot(slot)) {
return Status(Status::NotOK, "Slot is out of range");
}
return svr_->slot_import_->GetImportInfo(info, slot);
}

Status Cluster::GetSlotKeys(Redis::Connection *conn, int slot, int count, std::string *output) {
if (!svr_->GetConfig()->cluster_enabled) {
return Status(Status::NotOK, "Server is not running in cluster");
}
if (slot < -1 || slot >= HASH_SLOTS_SIZE) {
return Status(Status::NotOK, "Slot is out of range");
}
std::map<int, uint64_t> keyscnt;
std::vector<std::string> keys;
Redis::Database redis_db(svr_->storage_, conn->GetNamespace());
auto s = redis_db.GetSlotKeysInfo(slot, &keyscnt, &keys, count);
std::vector<std::string> slotkeysinfo;
for (int i = 0; i < HASH_SLOTS_SIZE; i++) {
if (keyscnt[i] != 0) {
slotkeysinfo.push_back("slot " + std::to_string(i) + ": " + std::to_string(keyscnt[i]));
}
}
if (count > 0 && !keys.empty()) {
if (static_cast<int>(keyscnt[slot]) > count) {
slotkeysinfo.push_back("Show top " + std::to_string(count) + " keys:");
} else {
slotkeysinfo.push_back("All keys of slot " + std::to_string(slot) + ":");
}
for (int i = 0; i < static_cast<int>(keys.size()); i++) {
slotkeysinfo.push_back(keys[i]);
}
}
*output = Redis::MultiBulkString(slotkeysinfo);
return Status::OK();
}

Status Cluster::GetClusterInfo(std::string *cluster_infos) {
if (version_ < 0) {
return Status(Status::ClusterDown,
Expand All @@ -379,6 +333,7 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) {
}

*cluster_infos =
"# Cluster Info\r\n"
"cluster_state:ok\r\n"
"cluster_slots_assigned:" + std::to_string(ok_slot) + "\r\n"
"cluster_slots_ok:" + std::to_string(ok_slot) + "\r\n"
Expand All @@ -389,6 +344,18 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) {
"cluster_current_epoch:" + std::to_string(version_) + "\r\n"
"cluster_my_epoch:" + std::to_string(version_) + "\r\n";

if (myself_ != nullptr && myself_->role_ == kClusterMaster && !svr_->IsSlave()) {
// Get migrating status
std::string migrate_infos;
svr_->slot_migrate_->GetMigrateInfo(&migrate_infos);
*cluster_infos += migrate_infos;

// Get importing status
std::string import_infos;
svr_->slot_import_->GetImportInfo(&import_infos);
*cluster_infos += import_infos;
}

return Status::OK();
}

Expand Down Expand Up @@ -620,6 +587,13 @@ Status Cluster::ParseClusterNodes(const std::string &nodes_str, ClusterNodes *no
return Status::OK();
}

bool Cluster::IsWriteForbiddenSlot(int slot) {
if (svr_->slot_migrate_->GetForbiddenSlot() == slot) {
return true;
}
return false;
}

Status Cluster::CanExecByMySelf(const Redis::CommandAttributes *attributes,
const std::vector<std::string> &cmd_tokens,
Redis::Connection *conn) {
Expand All @@ -644,13 +618,28 @@ Status Cluster::CanExecByMySelf(const Redis::CommandAttributes *attributes,
if (slots_nodes_[slot] == nullptr) {
return Status(Status::ClusterDown, "CLUSTERDOWN Hash slot not served");
} else if (myself_ && myself_ == slots_nodes_[slot]) {
// We use central controller to manage the topology of the cluster.
// Server can't change the topology directly, so we record the migrated slots
// to move the requests of the migrated slots to the destination node.
if (migrated_slots_.count(slot)) { // I'm not serving the migrated slot
return Status(Status::RedisExecErr, "MOVED " + std::to_string(slot) + " " + migrated_slots_[slot]);
return Status(Status::RedisExecErr, "Migrated MOVED " + std::to_string(slot) + " " + migrated_slots_[slot]);
}
// To keep data consistency, slot will be forbidden write while sending the last incremental data.
// During this phase, the requests of the migrating slot has to be rejected.
if (IsWriteForbiddenSlot(slot)) {
return Status(Status::RedisExecErr, "Can't write to slot being migrated which is in write forbidden phase");
}
return Status::OK(); // I'm serving this slot
} else if (myself_ && myself_->importing_slot_ == slot && conn->IsImporting()) {
// While data migrating, the topology of the destination node has not been changed.
// The destination node has to serve the requests from the migrating slot,
// although the slot is not belong to itself. Therefore, we record the importing slot
// and mark the importing connection to accept the importing data.
return Status::OK(); // I'm serving the importing connection
} else if (myself_ && imported_slots_.count(slot)) {
// After the slot is migrated, new requests of the migrated slot will be moved to
// the destination server. Before the central controller change the topology, the destination
// server should record the imported slots to accept new data of the imported slots.
return Status::OK(); // I'm serving the imported slot
} else if (myself_ && myself_->role_ == kClusterSlave
&& attributes->is_write() == false
Expand All @@ -660,6 +649,6 @@ Status Cluster::CanExecByMySelf(const Redis::CommandAttributes *attributes,
} else {
std::string ip_port = slots_nodes_[slot]->host_ + ":" +
std::to_string(slots_nodes_[slot]->port_);
return Status(Status::RedisExecErr, "MOVED " + std::to_string(slot) + " " + ip_port);
return Status(Status::RedisExecErr, "Serve MOVED " + std::to_string(slot) + " " + ip_port);
ChrisZMF marked this conversation as resolved.
Show resolved Hide resolved
}
}
4 changes: 1 addition & 3 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,13 @@ class Cluster {
uint64_t GetVersion() { return version_; }
static bool IsValidSlot(int slot) { return slot >= 0 && slot < kClusterSlots; }
bool IsNotMaster();
bool IsWriteForbiddenSlot(int slot);
Status CanExecByMySelf(const Redis::CommandAttributes *attributes,
const std::vector<std::string> &cmd_tokens,
Redis::Connection *conn);
void SetMasterSlaveRepl();
Status MigrateSlot(int slot, const std::string &dst_node_id);
Status ImportSlot(Redis::Connection *conn, int slot, int state);
Status GetMigrateInfo(int slot, std::vector<std::string> *info);
Status GetImportInfo(int slot, std::vector<std::string> *info);
Status GetSlotKeys(Redis::Connection *conn, int slot, int count, std::string *output);
std::string GetMyId() const { return myid_; }

static bool SubCommandIsExecExclusive(const std::string &subcommand);
Expand Down
2 changes: 1 addition & 1 deletion src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ Status Config::AddNamespace(const std::string &ns, const std::string &token) {
if (requirepass.empty()) {
return Status(Status::NotOK, "forbidden to add namespace when requirepass was empty");
}
if (slot_id_encoded) {
if (cluster_enabled) {
return Status(Status::NotOK, "forbidden to add namespace when cluster mode was enabled");
ChrisZMF marked this conversation as resolved.
Show resolved Hide resolved
}
auto s = isNamespaceLegal(ns);
Expand Down
70 changes: 9 additions & 61 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,9 @@ class CommandKeys : public Commander {
class CommandFlushDB : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
if (svr->GetConfig()->slot_id_encoded) {
if (svr->GetConfig()->cluster_enabled) {
if (svr->slot_migrate_->GetMigrateState() == kMigrateStart) {
svr->slot_migrate_->StopMigrateTask();
svr->slot_migrate_->SetMigrateStopFlag(true);
LOG(INFO) << "Stop migration task for flushdb";
}
}
Expand All @@ -171,9 +171,9 @@ class CommandFlushAll : public Commander {
*output = Redis::Error(errAdministorPermissionRequired);
return Status::OK();
}
if (svr->GetConfig()->slot_id_encoded) {
if (svr->GetConfig()->cluster_enabled) {
if (svr->slot_migrate_->GetMigrateState() == kMigrateStart) {
svr->slot_migrate_->StopMigrateTask();
svr->slot_migrate_->SetMigrateStopFlag(true);
LOG(INFO) << "Stop migration task for flushall";
}
}
Expand Down Expand Up @@ -3464,8 +3464,8 @@ class CommandSlaveOf : public Commander {
if (s.IsOK()) {
*output = Redis::SimpleString("OK");
LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')";
if (svr->GetConfig()->slot_id_encoded) {
svr->slot_migrate_->StartMigrateTask();
if (svr->GetConfig()->cluster_enabled) {
svr->slot_migrate_->SetMigrateStopFlag(false);
LOG(INFO) << "Change server role to master, restart migration task";
}
}
Expand All @@ -3475,8 +3475,8 @@ class CommandSlaveOf : public Commander {
*output = Redis::SimpleString("OK");
LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_
<< " enabled (user request from '" << conn->GetAddr() << "')";
if (svr->GetConfig()->slot_id_encoded) {
svr->slot_migrate_->StopMigrateTask();
if (svr->GetConfig()->cluster_enabled) {
svr->slot_migrate_->SetMigrateStopFlag(true);
LOG(INFO) << "Change server role to slave, stop migration task";
}
} else {
Expand Down Expand Up @@ -4274,15 +4274,6 @@ class CommandCluster : public Commander {
if (args.size() == 2 && (subcommand_ == "nodes" || subcommand_ == "slots"
|| subcommand_ == "info")) return Status::OK();
if (subcommand_ == "keyslot" && args_.size() == 3) return Status::OK();
if (subcommand_ == "migratestatus" || subcommand_ == "importstatus") {
if (args.size() != 3) return Status(Status::RedisParseErr, errWrongNumOfArguments);
try {
slot_ = atoi(args[2].c_str());
} catch (std::exception &e) {
return Status(Status::RedisParseErr, errValueNotInterger);
}
return Status::OK();
}
if (subcommand_ == "import") {
if (args.size() != 4) return Status(Status::RedisParseErr, errWrongNumOfArguments);
try {
Expand All @@ -4293,29 +4284,8 @@ class CommandCluster : public Commander {
}
return Status::OK();
}
if (subcommand_ == "slotkeys") {
if (args.size() == 2) return Status::OK();
if (args.size() == 3) {
try {
slot_ = atoi(args[2].c_str());
} catch (std::exception &e) {
return Status(Status::RedisParseErr, errValueNotInterger);
}
return Status::OK();
} else if (args.size() == 4) {
try {
slot_ = atoi(args[2].c_str());
count_ = atoi(args[3].c_str());
} catch (std::exception &e) {
return Status(Status::RedisParseErr, errValueNotInterger);
}
return Status::OK();
} else {
return Status(Status::RedisParseErr, errWrongNumOfArguments);
}
}
return Status(Status::RedisParseErr,
"CLUSTER command, CLUSTER INFO|NODES|SLOTS|KEYSLOT|MIGRATESTATUS|IMPORTSTATUS|SLOTKEYS");
"CLUSTER command, CLUSTER INFO|NODES|SLOTS|KEYSLOT");
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -4367,27 +4337,6 @@ class CommandCluster : public Commander {
} else {
*output = Redis::Error(s.Msg());
}
} else if (subcommand_ == "migratestatus") {
std::vector<std::string> migrate_info;
Status s = svr->cluster_->GetMigrateInfo(slot_, &migrate_info);
if (s.IsOK()) {
*output = Redis::MultiBulkString(migrate_info);
} else {
*output = Redis::Error(s.Msg());
}
} else if (subcommand_ == "importstatus") {
std::vector<std::string> import_info;
Status s = svr->cluster_->GetImportInfo(slot_, &import_info);
if (s.IsOK()) {
*output = Redis::MultiBulkString(import_info);
} else {
*output = Redis::Error(s.Msg());
}
} else if (subcommand_ == "slotkeys") {
Status s = svr->cluster_->GetSlotKeys(conn, slot_, count_, output);
if (!s.IsOK()) {
*output = Redis::Error(s.Msg());
}
} else if (subcommand_ == "import") {
Status s = svr->cluster_->ImportSlot(conn, slot_, state_);
if (s.IsOK()) {
Expand All @@ -4404,7 +4353,6 @@ class CommandCluster : public Commander {
private:
std::string subcommand_;
int slot_ = -1;
int count_ = 0;
ImportStatus state_ = kImportNone;
};

Expand Down
Loading