Skip to content

Commit

Permalink
Support slot-based data migration (#430)
Browse files Browse the repository at this point in the history
A new command CLUSTERX MIGRATE is used for migrate slot data, slot-based migration
process mainly includes the following stages: migrating existing data and migrating
incremental data.

Command format:
CLUSTERX MIGRATE $slot $dst_nodeid
  - $slot is the slot which is to migrate
  - $dst_nodeid is the node id of destination server in the cluster.

We also introduce an internal command CLUSTER IMPORT for importing the migrating
slot data into destination server.

Migration status are shown into the output of CLUSTER INFO command.

After migration slot, you also should use CLUSTERX SETSLOT command to change cluster slot
distribution.

For more details, please see #412 and #430
  • Loading branch information
ChrisZMF authored Jan 27, 2022
1 parent a603fba commit 805dc84
Show file tree
Hide file tree
Showing 41 changed files with 3,038 additions and 335 deletions.
14 changes: 12 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ endif()
target_sources(kvrocks PRIVATE
src/cluster.cc
src/cluster.h
src/slot_migrate.h
src/slot_migrate.cc
src/slot_import.h
src/slot_import.cc
src/batch_extractor.h
src/batch_extractor.cc
src/redis_db.cc
src/redis_db.h
src/redis_connection.cc
Expand Down Expand Up @@ -198,6 +204,12 @@ target_link_libraries(kvrocks2redis ${EXTERNAL_LIBS})
target_sources(kvrocks2redis PRIVATE
src/cluster.h
src/cluster.cc
src/slot_migrate.h
src/slot_migrate.cc
src/slot_import.h
src/slot_import.cc
src/batch_extractor.h
src/batch_extractor.cc
src/redis_db.cc
src/redis_db.h
src/compact_filter.cc
Expand Down Expand Up @@ -279,8 +291,6 @@ target_sources(kvrocks2redis PRIVATE
tools/kvrocks2redis/main.cc
tools/kvrocks2redis/sync.cc
tools/kvrocks2redis/sync.h
tools/kvrocks2redis/util.cc
tools/kvrocks2redis/util.h
tools/kvrocks2redis/redis_writer.cc
tools/kvrocks2redis/redis_writer.h
tools/kvrocks2redis/writer.cc
Expand Down
35 changes: 31 additions & 4 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ workers 8
# Note that kvrocks will write a pid file in /var/run/kvrocks.pid when daemonized.
daemonize no

# Kvrocks implements cluster solution that is similar with redis cluster sulution.
# Kvrocks implements cluster solution that is similar with redis cluster solution.
# You can get cluster information by CLUSTER NODES|SLOTS|INFO command, it also is
# adapted to redis-cli, redis-benchmark, redis cluster SDK and redis cluster proxy.
# But kvrocks doesn't support to communicate with each others, so you must set
Expand Down Expand Up @@ -261,15 +261,15 @@ supervised no
# https://github.com/facebook/rocksdb/wiki/Perf-Context-and-IO-Stats-Context
#
# This mechanism is enabled when profiling-sample-commands is not empty and
# profiling-sample-ratio greater than 0.
# profiling-sample-ratio greater than 0.
# It is important to note that this mechanism affects performance, but it is
# useful for troubleshooting performance bottlenecks, so it should only be
# enabled when performance problems occur.

# The name of the commands you want to record. Must be original name of
# The name of the commands you want to record. Must be original name of
# commands supported by Kvrocks. Use ',' to separate multiple commands and
# use '*' to record all commands supported by Kvrocks.
# Example:
# Example:
# - Single command: profiling-sample-commands get
# - Multiple commands: profiling-sample-commands get,mget,hget
#
Expand Down Expand Up @@ -340,6 +340,33 @@ compaction-checker-range 0-7
# Default: yes
auto-resize-block-and-sst yes

################################ MIGRATE #####################################
# If the network bandwidth is completely consumed by the migration task,
# it will affect the availability of kvrocks. To avoid this situation,
# migrate-speed is adpoted to limit the migrating speed.
# Migrating speed is limited by controling the duraiton between sending data,
# the duation is calculated by: 1000000 * migrate-pipeline-size / migrate-speed (us).
# Value: [0,INT_MAX], 0 means no limit
#
# Default: 4096
migrate-speed 4096

# In order to reduce data transimission times and improve the efficiency of data migration,
# pipeline is adopted to send multiple data at once. Pipeline size can be set by this option.
# Value: [1, INT_MAX], it can't be 0
#
# Default: 16
migrate-pipeline-size 16

# In order to reduce the write forbidden time during migrating slot, we will migrate the incremetal
# data sevral times to reduce the amount of incremetal data. Until the quantity of incremetal
# data is reduced to a certain threshold, slot will be forbidden write. The threshold is set by
# this option.
# Value: [1, INT_MAX], it can't be 0
#
# Default: 10000
migrate-sequence-gap 10000

################################ ROCKSDB #####################################

# Specify the capacity of metadata column family block cache. Larger block cache
Expand Down
7 changes: 4 additions & 3 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ SHARED_OBJS= cluster.o compact_filter.o config.o cron.o encoding.o event_listene
redis_hash.o redis_list.o redis_metadata.o redis_slot.o redis_pubsub.o redis_reply.o \
redis_request.o redis_set.o redis_string.o redis_zset.o redis_geo.o replication.o \
server.o stats.o storage.o task_runner.o util.o geohash.o worker.o redis_sortedint.o \
compaction_checker.o table_properties_collector.o scripting.o sha1.o rand.o
compaction_checker.o table_properties_collector.o scripting.o sha1.o rand.o \
slot_migrate.o slot_import.o batch_extractor.o
KVROCKS_OBJS= $(SHARED_OBJS) main.o

UNITTEST_OBJS= $(SHARED_OBJS) ../tests/main.o ../tests/t_metadata_test.o ../tests/compact_test.o \
Expand All @@ -67,7 +68,7 @@ UNITTEST_OBJS= $(SHARED_OBJS) ../tests/main.o ../tests/t_metadata_test.o ../test

K2RDIR= ../tools/kvrocks2redis
KVROCKS2REDIS_OBJS= $(SHARED_OBJS) $(K2RDIR)/main.o $(K2RDIR)/config.o $(K2RDIR)/parser.o \
$(K2RDIR)/redis_writer.o $(K2RDIR)/sync.o $(K2RDIR)/util.o $(K2RDIR)/writer.o
$(K2RDIR)/redis_writer.o $(K2RDIR)/sync.o $(K2RDIR)/writer.o


KVROCKS_CXX=$(QUIET_CXX)$(CXX) $(FINAL_CXXFLAGS)
Expand Down Expand Up @@ -109,7 +110,7 @@ lint:

$(PROG): $(GLOG) $(LIBEVENT) $(ROCKSDB) $(LUA) $(KVROCKS_OBJS)
$(KVROCKS_LD) -o $(PROG) $(KVROCKS_OBJS) $(FINAL_LIBS) $(LDFLAGS)

$(GLOG):
cd $(GLOG_PATH);./autogen.sh; ./configure --disable-shared; \
$(MAKE) CXXFLAGS='-fPIC' -C $(GLOG_PATH)/
Expand Down
207 changes: 207 additions & 0 deletions src/batch_extractor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#include "batch_extractor.h"

#include <rocksdb/write_batch.h>
#include <glog/logging.h>

#include "redis_bitmap.h"
#include "redis_slot.h"
#include "redis_reply.h"

void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) {
log_data_.Decode(blob);
}

rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slice &key,
const Slice &value) {
if (column_family_id == kColumnFamilyIDZSetScore) {
return rocksdb::Status::OK();
}

std::string ns, user_key, sub_key;
std::vector<std::string> command_args;
if (column_family_id == kColumnFamilyIDMetadata) {
ExtractNamespaceKey(key, &ns, &user_key, is_slotid_encoded_);
if (slot_ >= 0) {
if (static_cast<uint16_t>(slot_) != GetSlotNumFromKey(user_key)) return rocksdb::Status::OK();
}
Metadata metadata(kRedisNone);
metadata.Decode(value.ToString());
if (metadata.Type() == kRedisString) {
command_args = {"SET", user_key, value.ToString().substr(5, value.size() - 5)};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
if (metadata.expire > 0) {
command_args = {"EXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
} else if (metadata.expire > 0) {
auto args = log_data_.GetArguments();
if (args->size() > 0) {
RedisCommand cmd = static_cast<RedisCommand >(std::stoi((*args)[0]));
if (cmd == kRedisCmdExpire) {
command_args = {"EXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
}
}

return rocksdb::Status::OK();
}

if (column_family_id == kColumnFamilyIDDefault) {
InternalKey ikey(key, is_slotid_encoded_);
user_key = ikey.GetKey().ToString();
if (slot_ >= 0) {
if (static_cast<uint16_t>(slot_) != GetSlotNumFromKey(user_key)) return rocksdb::Status::OK();
}
sub_key = ikey.GetSubKey().ToString();
ns = ikey.GetNamespace().ToString();
switch (log_data_.GetRedisType()) {
case kRedisHash:command_args = {"HSET", user_key, sub_key, value.ToString()};
break;
case kRedisList: {
auto args = log_data_.GetArguments();
if (args->size() < 1) {
LOG(ERROR) << "Fail to parse write_batch in putcf type list : args error ,should at least contain cmd";
return rocksdb::Status::OK();
}
RedisCommand cmd = static_cast<RedisCommand >(std::stoi((*args)[0]));
switch (cmd) {
case kRedisCmdLSet:
if (args->size() < 2) {
LOG(ERROR) << "Fail to parse write_batch in putcf cmd lset : args error ,should contain lset index";
return rocksdb::Status::OK();
}
command_args = {"LSET", user_key, (*args)[1], value.ToString()};
break;
case kRedisCmdLInsert:
if (first_seen_) {
if (args->size() < 4) {
LOG(ERROR)
<< "Fail to parse write_batch in putcf cmd linsert : args error, should contain before pivot value";
return rocksdb::Status::OK();
}
command_args = {"LINSERT", user_key, (*args)[1] == "1" ? "before" : "after", (*args)[2], (*args)[3]};
first_seen_ = false;
}
break;
case kRedisCmdLRem:
// lrem will be parsed in deletecf, so ignore this putcf
break;
default:command_args = {cmd == kRedisCmdLPush ? "LPUSH" : "RPUSH", user_key, value.ToString()};
}
break;
}
case kRedisSet:command_args = {"SADD", user_key, sub_key};
break;
case kRedisZSet: {
double score = DecodeDouble(value.data());
command_args = {"ZADD", user_key, std::to_string(score), sub_key};
break;
}
case kRedisBitmap: {
auto args = log_data_.GetArguments();
if (args->size() < 1) {
LOG(ERROR) << "Fail to parse write_batch in putcf cmd setbit : args error ,should contain setbit offset";
return rocksdb::Status::OK();
}
bool bit_value = Redis::Bitmap::GetBitFromValueAndOffset(value.ToString(), std::stoi((*args)[0]));
command_args = {"SETBIT", user_key, (*args)[0], bit_value ? "1" : "0"};
break;
}
case kRedisSortedint: {
if (!to_redis_) {
command_args = {"SIADD", user_key, std::to_string(DecodeFixed64(sub_key.data()))};
}
break;
}
default: break;
}
}

if (!command_args.empty()) {
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
return rocksdb::Status::OK();
}

rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const Slice &key) {
if (column_family_id == kColumnFamilyIDZSetScore) {
return rocksdb::Status::OK();
}

std::string ns, user_key, sub_key;
std::vector<std::string> command_args;
if (column_family_id == kColumnFamilyIDMetadata) {
ExtractNamespaceKey(key, &ns, &user_key, is_slotid_encoded_);
if (slot_ >= 0) {
if (static_cast<uint16_t>(slot_) != GetSlotNumFromKey(user_key)) return rocksdb::Status::OK();
}
command_args = {"DEL", user_key};
} else if (column_family_id == kColumnFamilyIDDefault) {
InternalKey ikey(key, is_slotid_encoded_);
user_key = ikey.GetKey().ToString();
if (slot_ >= 0) {
if (static_cast<uint16_t>(slot_) != GetSlotNumFromKey(user_key)) return rocksdb::Status::OK();
}
sub_key = ikey.GetSubKey().ToString();
ns = ikey.GetNamespace().ToString();
switch (log_data_.GetRedisType()) {
case kRedisHash: command_args = {"HDEL", user_key, sub_key};
break;
case kRedisSet: command_args = {"SREM", user_key, sub_key};
break;
case kRedisZSet: command_args = {"ZREM", user_key, sub_key};
break;
case kRedisList: {
auto args = log_data_.GetArguments();
if (args->size() < 1) {
LOG(ERROR) << "Fail to parse write_batch in DeleteCF type list : args error ,should contain cmd";
return rocksdb::Status::OK();
}
RedisCommand cmd = static_cast<RedisCommand >(std::stoi((*args)[0]));
switch (cmd) {
case kRedisCmdLTrim:
if (first_seen_) {
if (args->size() < 3) {
LOG(ERROR) << "Fail to parse write_batch in DeleteCF cmd ltrim : args error ,should contain start,stop";
return rocksdb::Status::OK();
}
command_args = {"LTRIM", user_key, (*args)[1], (*args)[2]};
first_seen_ = false;
}
break;
case kRedisCmdLRem:
if (first_seen_) {
if (args->size() < 3) {
LOG(ERROR) << "Fail to parse write_batch in DeleteCF cmd lrem : args error ,should contain count,value";
return rocksdb::Status::OK();
}
command_args = {"LREM", user_key, (*args)[1], (*args)[2]};
first_seen_ = false;
}
break;
default:command_args = {cmd == kRedisCmdLPop ? "LPOP" : "RPOP", user_key};
}
break;
}
case kRedisSortedint: {
if (!to_redis_) {
command_args = {"SIREM", user_key, std::to_string(DecodeFixed64(sub_key.data()))};
}
break;
}
default: break;
}
}

if (!command_args.empty()) {
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
return rocksdb::Status::OK();
}

rocksdb::Status WriteBatchExtractor::DeleteRangeCF(uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key) {
// Do nothing about DeleteRange operations
return rocksdb::Status::OK();
}
32 changes: 32 additions & 0 deletions src/batch_extractor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once
#include <string>
#include <vector>
#include <map>

#include "redis_db.h"
#include "status.h"
#include "storage.h"
#include "redis_metadata.h"


// An extractor to extract update from raw writebatch
class WriteBatchExtractor : public rocksdb::WriteBatch::Handler {
public:
explicit WriteBatchExtractor(bool is_slotid_encoded, int16_t slot = -1, bool to_redis = false)
: is_slotid_encoded_(is_slotid_encoded), slot_(slot), to_redis_(to_redis) {}
void LogData(const rocksdb::Slice &blob) override;
rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key,
const Slice &value) override;

rocksdb::Status DeleteCF(uint32_t column_family_id, const Slice &key) override;
rocksdb::Status DeleteRangeCF(uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key) override;
std::map<std::string, std::vector<std::string>> *GetRESPCommands() { return &resp_commands_; }
private:
std::map<std::string, std::vector<std::string>> resp_commands_;
Redis::WriteBatchLogData log_data_;
bool first_seen_ = true;
bool is_slotid_encoded_ = false;
int slot_;
bool to_redis_;
};
Loading

0 comments on commit 805dc84

Please sign in to comment.