Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support slot-based data migration #430

Merged
merged 20 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ endif()
target_sources(kvrocks PRIVATE
src/cluster.cc
src/cluster.h
src/slot_migrate.h
src/slot_migrate.cc
src/slot_import.h
src/slot_import.cc
src/batch_extractor.h
src/batch_extractor.cc
src/redis_db.cc
src/redis_db.h
src/redis_connection.cc
Expand Down Expand Up @@ -198,6 +204,12 @@ target_link_libraries(kvrocks2redis ${EXTERNAL_LIBS})
target_sources(kvrocks2redis PRIVATE
src/cluster.h
src/cluster.cc
src/slot_migrate.h
src/slot_migrate.cc
src/slot_import.h
src/slot_import.cc
src/batch_extractor.h
src/batch_extractor.cc
src/redis_db.cc
src/redis_db.h
src/compact_filter.cc
Expand Down Expand Up @@ -279,8 +291,6 @@ target_sources(kvrocks2redis PRIVATE
tools/kvrocks2redis/main.cc
tools/kvrocks2redis/sync.cc
tools/kvrocks2redis/sync.h
tools/kvrocks2redis/util.cc
tools/kvrocks2redis/util.h
tools/kvrocks2redis/redis_writer.cc
tools/kvrocks2redis/redis_writer.h
tools/kvrocks2redis/writer.cc
Expand Down
35 changes: 31 additions & 4 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ workers 8
# Note that kvrocks will write a pid file in /var/run/kvrocks.pid when daemonized.
daemonize no

# Kvrocks implements cluster solution that is similar with redis cluster sulution.
# Kvrocks implements cluster solution that is similar with redis cluster solution.
# You can get cluster information by CLUSTER NODES|SLOTS|INFO command, it also is
# adapted to redis-cli, redis-benchmark, redis cluster SDK and redis cluster proxy.
# But kvrocks doesn't support to communicate with each others, so you must set
Expand Down Expand Up @@ -261,15 +261,15 @@ supervised no
# https://github.com/facebook/rocksdb/wiki/Perf-Context-and-IO-Stats-Context
#
# This mechanism is enabled when profiling-sample-commands is not empty and
# profiling-sample-ratio greater than 0.
# profiling-sample-ratio greater than 0.
# It is important to note that this mechanism affects performance, but it is
# useful for troubleshooting performance bottlenecks, so it should only be
# enabled when performance problems occur.

# The name of the commands you want to record. Must be original name of
# The name of the commands you want to record. Must be original name of
# commands supported by Kvrocks. Use ',' to separate multiple commands and
# use '*' to record all commands supported by Kvrocks.
# Example:
# Example:
# - Single command: profiling-sample-commands get
# - Multiple commands: profiling-sample-commands get,mget,hget
#
Expand Down Expand Up @@ -340,6 +340,33 @@ compaction-checker-range 0-7
# Default: yes
auto-resize-block-and-sst yes

################################ MIGRATE #####################################
# If the network bandwidth is completely consumed by the migration task,
# it will affect the availability of kvrocks. To avoid this situation,
# migrate-speed is adpoted to limit the migrating speed.
# Migrating speed is limited by controling the duraiton between sending data,
# the duation is calculated by: 1000000 * migrate-pipeline-size / migrate-speed (us).
# Value: [0,INT_MAX], 0 means no limit
#
# Default: 4096
migrate-speed 4096

# In order to reduce data transimission times and improve the efficiency of data migration,
# pipeline is adopted to send multiple data at once. Pipeline size can be set by this option.
# Value: [1, INT_MAX], it can't be 0
#
# Default: 16
migrate-pipeline-size 16

# In order to reduce the write forbidden time during migrating slot, we will migrate the incremetal
# data sevral times to reduce the amount of incremetal data. Until the quantity of incremetal
# data is reduced to a certain threshold, slot will be forbidden write. The threshold is set by
# this option.
# Value: [1, INT_MAX], it can't be 0
#
# Default: 10000
migrate-sequence-gap 10000

################################ ROCKSDB #####################################

# Specify the capacity of metadata column family block cache. Larger block cache
Expand Down
7 changes: 4 additions & 3 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ SHARED_OBJS= cluster.o compact_filter.o config.o cron.o encoding.o event_listene
redis_hash.o redis_list.o redis_metadata.o redis_slot.o redis_pubsub.o redis_reply.o \
redis_request.o redis_set.o redis_string.o redis_zset.o redis_geo.o replication.o \
server.o stats.o storage.o task_runner.o util.o geohash.o worker.o redis_sortedint.o \
compaction_checker.o table_properties_collector.o scripting.o sha1.o rand.o
compaction_checker.o table_properties_collector.o scripting.o sha1.o rand.o \
slot_migrate.o slot_import.o batch_extractor.o
KVROCKS_OBJS= $(SHARED_OBJS) main.o

UNITTEST_OBJS= $(SHARED_OBJS) ../tests/main.o ../tests/t_metadata_test.o ../tests/compact_test.o \
Expand All @@ -67,7 +68,7 @@ UNITTEST_OBJS= $(SHARED_OBJS) ../tests/main.o ../tests/t_metadata_test.o ../test

K2RDIR= ../tools/kvrocks2redis
KVROCKS2REDIS_OBJS= $(SHARED_OBJS) $(K2RDIR)/main.o $(K2RDIR)/config.o $(K2RDIR)/parser.o \
$(K2RDIR)/redis_writer.o $(K2RDIR)/sync.o $(K2RDIR)/util.o $(K2RDIR)/writer.o
$(K2RDIR)/redis_writer.o $(K2RDIR)/sync.o $(K2RDIR)/writer.o


KVROCKS_CXX=$(QUIET_CXX)$(CXX) $(FINAL_CXXFLAGS)
Expand Down Expand Up @@ -109,7 +110,7 @@ lint:

$(PROG): $(GLOG) $(LIBEVENT) $(ROCKSDB) $(LUA) $(KVROCKS_OBJS)
$(KVROCKS_LD) -o $(PROG) $(KVROCKS_OBJS) $(FINAL_LIBS) $(LDFLAGS)

$(GLOG):
cd $(GLOG_PATH);./autogen.sh; ./configure --disable-shared; \
$(MAKE) CXXFLAGS='-fPIC' -C $(GLOG_PATH)/
Expand Down
207 changes: 207 additions & 0 deletions src/batch_extractor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
#include "batch_extractor.h"

#include <rocksdb/write_batch.h>
#include <glog/logging.h>

#include "redis_bitmap.h"
#include "redis_slot.h"
#include "redis_reply.h"

void WriteBatchExtractor::LogData(const rocksdb::Slice &blob) {
log_data_.Decode(blob);
}

rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slice &key,
const Slice &value) {
if (column_family_id == kColumnFamilyIDZSetScore) {
return rocksdb::Status::OK();
}

std::string ns, user_key, sub_key;
std::vector<std::string> command_args;
if (column_family_id == kColumnFamilyIDMetadata) {
ExtractNamespaceKey(key, &ns, &user_key, is_slotid_encoded_);
if (slot_ >= 0) {
if (static_cast<uint16_t>(slot_) != GetSlotNumFromKey(user_key)) return rocksdb::Status::OK();
}
Metadata metadata(kRedisNone);
metadata.Decode(value.ToString());
if (metadata.Type() == kRedisString) {
command_args = {"SET", user_key, value.ToString().substr(5, value.size() - 5)};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
if (metadata.expire > 0) {
command_args = {"EXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
} else if (metadata.expire > 0) {
auto args = log_data_.GetArguments();
if (args->size() > 0) {
ShooterIT marked this conversation as resolved.
Show resolved Hide resolved
RedisCommand cmd = static_cast<RedisCommand >(std::stoi((*args)[0]));
if (cmd == kRedisCmdExpire) {
command_args = {"EXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
}
}

return rocksdb::Status::OK();
}

if (column_family_id == kColumnFamilyIDDefault) {
InternalKey ikey(key, is_slotid_encoded_);
user_key = ikey.GetKey().ToString();
if (slot_ >= 0) {
if (static_cast<uint16_t>(slot_) != GetSlotNumFromKey(user_key)) return rocksdb::Status::OK();
}
sub_key = ikey.GetSubKey().ToString();
ns = ikey.GetNamespace().ToString();
switch (log_data_.GetRedisType()) {
case kRedisHash:command_args = {"HSET", user_key, sub_key, value.ToString()};
break;
case kRedisList: {
auto args = log_data_.GetArguments();
if (args->size() < 1) {
LOG(ERROR) << "Fail to parse write_batch in putcf type list : args error ,should at least contain cmd";
return rocksdb::Status::OK();
}
RedisCommand cmd = static_cast<RedisCommand >(std::stoi((*args)[0]));
switch (cmd) {
case kRedisCmdLSet:
if (args->size() < 2) {
LOG(ERROR) << "Fail to parse write_batch in putcf cmd lset : args error ,should contain lset index";
return rocksdb::Status::OK();
}
command_args = {"LSET", user_key, (*args)[1], value.ToString()};
break;
case kRedisCmdLInsert:
if (first_seen_) {
if (args->size() < 4) {
LOG(ERROR)
<< "Fail to parse write_batch in putcf cmd linsert : args error, should contain before pivot value";
return rocksdb::Status::OK();
}
command_args = {"LINSERT", user_key, (*args)[1] == "1" ? "before" : "after", (*args)[2], (*args)[3]};
first_seen_ = false;
}
break;
case kRedisCmdLRem:
// lrem will be parsed in deletecf, so ignore this putcf
break;
default:command_args = {cmd == kRedisCmdLPush ? "LPUSH" : "RPUSH", user_key, value.ToString()};
}
break;
}
case kRedisSet:command_args = {"SADD", user_key, sub_key};
break;
case kRedisZSet: {
double score = DecodeDouble(value.data());
command_args = {"ZADD", user_key, std::to_string(score), sub_key};
break;
}
case kRedisBitmap: {
auto args = log_data_.GetArguments();
if (args->size() < 1) {
LOG(ERROR) << "Fail to parse write_batch in putcf cmd setbit : args error ,should contain setbit offset";
return rocksdb::Status::OK();
}
bool bit_value = Redis::Bitmap::GetBitFromValueAndOffset(value.ToString(), std::stoi((*args)[0]));
command_args = {"SETBIT", user_key, (*args)[0], bit_value ? "1" : "0"};
break;
}
case kRedisSortedint: {
if (!to_redis_) {
command_args = {"SIADD", user_key, std::to_string(DecodeFixed64(sub_key.data()))};
}
break;
}
default: break;
}
}

if (!command_args.empty()) {
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
return rocksdb::Status::OK();
}

rocksdb::Status WriteBatchExtractor::DeleteCF(uint32_t column_family_id, const Slice &key) {
if (column_family_id == kColumnFamilyIDZSetScore) {
return rocksdb::Status::OK();
}

std::string ns, user_key, sub_key;
std::vector<std::string> command_args;
if (column_family_id == kColumnFamilyIDMetadata) {
ExtractNamespaceKey(key, &ns, &user_key, is_slotid_encoded_);
if (slot_ >= 0) {
if (static_cast<uint16_t>(slot_) != GetSlotNumFromKey(user_key)) return rocksdb::Status::OK();
}
command_args = {"DEL", user_key};
} else if (column_family_id == kColumnFamilyIDDefault) {
InternalKey ikey(key, is_slotid_encoded_);
user_key = ikey.GetKey().ToString();
if (slot_ >= 0) {
if (static_cast<uint16_t>(slot_) != GetSlotNumFromKey(user_key)) return rocksdb::Status::OK();
}
sub_key = ikey.GetSubKey().ToString();
ns = ikey.GetNamespace().ToString();
switch (log_data_.GetRedisType()) {
case kRedisHash: command_args = {"HDEL", user_key, sub_key};
break;
case kRedisSet: command_args = {"SREM", user_key, sub_key};
break;
case kRedisZSet: command_args = {"ZREM", user_key, sub_key};
break;
case kRedisList: {
auto args = log_data_.GetArguments();
if (args->size() < 1) {
LOG(ERROR) << "Fail to parse write_batch in DeleteCF type list : args error ,should contain cmd";
return rocksdb::Status::OK();
}
RedisCommand cmd = static_cast<RedisCommand >(std::stoi((*args)[0]));
switch (cmd) {
case kRedisCmdLTrim:
if (first_seen_) {
if (args->size() < 3) {
LOG(ERROR) << "Fail to parse write_batch in DeleteCF cmd ltrim : args error ,should contain start,stop";
return rocksdb::Status::OK();
}
command_args = {"LTRIM", user_key, (*args)[1], (*args)[2]};
first_seen_ = false;
}
break;
case kRedisCmdLRem:
if (first_seen_) {
if (args->size() < 3) {
LOG(ERROR) << "Fail to parse write_batch in DeleteCF cmd lrem : args error ,should contain count,value";
return rocksdb::Status::OK();
}
command_args = {"LREM", user_key, (*args)[1], (*args)[2]};
first_seen_ = false;
}
break;
default:command_args = {cmd == kRedisCmdLPop ? "LPOP" : "RPOP", user_key};
}
break;
}
case kRedisSortedint: {
if (!to_redis_) {
command_args = {"SIREM", user_key, std::to_string(DecodeFixed64(sub_key.data()))};
}
break;
}
default: break;
}
}

if (!command_args.empty()) {
resp_commands_[ns].emplace_back(Redis::Command2RESP(command_args));
}
return rocksdb::Status::OK();
}

rocksdb::Status WriteBatchExtractor::DeleteRangeCF(uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key) {
// Do nothing about DeleteRange operations
return rocksdb::Status::OK();
}
32 changes: 32 additions & 0 deletions src/batch_extractor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once
#include <string>
#include <vector>
#include <map>

#include "redis_db.h"
#include "status.h"
#include "storage.h"
#include "redis_metadata.h"


// An extractor to extract update from raw writebatch
class WriteBatchExtractor : public rocksdb::WriteBatch::Handler {
public:
explicit WriteBatchExtractor(bool is_slotid_encoded, int16_t slot = -1, bool to_redis = false)
: is_slotid_encoded_(is_slotid_encoded), slot_(slot), to_redis_(to_redis) {}
void LogData(const rocksdb::Slice &blob) override;
rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key,
const Slice &value) override;

rocksdb::Status DeleteCF(uint32_t column_family_id, const Slice &key) override;
rocksdb::Status DeleteRangeCF(uint32_t column_family_id,
const Slice& begin_key, const Slice& end_key) override;
std::map<std::string, std::vector<std::string>> *GetRESPCommands() { return &resp_commands_; }
private:
std::map<std::string, std::vector<std::string>> resp_commands_;
Redis::WriteBatchLogData log_data_;
bool first_seen_ = true;
bool is_slotid_encoded_ = false;
int slot_;
bool to_redis_;
};
Loading