Skip to content

Commit

Permalink
chore: improve the style and document for SlotMigration (#2465)
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU authored Aug 5, 2024
1 parent f9d7297 commit bf56a05
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 27 deletions.
49 changes: 34 additions & 15 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
#include "time_util.h"
#include "types/redis_stream_base.h"

const char *errFailedToSendCommands = "failed to send commands to restore a key";
const char *errMigrationTaskCanceled = "key migration stopped due to a task cancellation";
const char *errFailedToSetImportStatus = "failed to set import status on destination node";
const char *errUnsupportedMigrationType = "unsupported migration type";
constexpr std::string_view errFailedToSendCommands = "failed to send commands to restore a key";
constexpr std::string_view errMigrationTaskCanceled = "key migration stopped due to a task cancellation";
constexpr std::string_view errFailedToSetImportStatus = "failed to set import status on destination node";
constexpr std::string_view errUnsupportedMigrationType = "unsupported migration type";

static std::map<RedisType, std::string> type_to_cmd = {
{kRedisString, "set"}, {kRedisList, "rpush"}, {kRedisHash, "hmset"}, {kRedisSet, "sadd"},
Expand Down Expand Up @@ -318,7 +318,7 @@ Status SlotMigrator::sendSnapshot() {
} else if (migration_type_ == MigrationType::kRawKeyValue) {
return sendSnapshotByRawKV();
}
return {Status::NotOK, errUnsupportedMigrationType};
return {Status::NotOK, std::string(errUnsupportedMigrationType)};
}

Status SlotMigrator::syncWAL() {
Expand All @@ -327,7 +327,7 @@ Status SlotMigrator::syncWAL() {
} else if (migration_type_ == MigrationType::kRawKeyValue) {
return syncWALByRawKV();
}
return {Status::NotOK, errUnsupportedMigrationType};
return {Status::NotOK, std::string(errUnsupportedMigrationType)};
}

Status SlotMigrator::sendSnapshotByCmd() {
Expand All @@ -337,7 +337,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
std::string restore_cmds;
SlotRange slot_range = slot_range_;

LOG(INFO) << "[migrate] Start migrating snapshot of slot(s)" << slot_range.String();
LOG(INFO) << "[migrate] Start migrating snapshot of slot(s): " << slot_range.String();

// Construct key prefix to iterate the keys belong to the target slot
std::string prefix = ComposeSlotKeyPrefix(namespace_, slot_range.start);
Expand All @@ -351,12 +351,12 @@ Status SlotMigrator::sendSnapshotByCmd() {
auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle));

// Seek to the beginning of keys start with 'prefix' and iterate all these keys
auto current_slot = slot_range.start;
int current_slot = slot_range.start;
for (iter->Seek(prefix); iter->Valid(); iter->Next()) {
// The migrating task has to be stopped, if server role is changed from master to slave
// or flush command (flushdb or flushall) is executed
if (stop_migration_) {
return {Status::NotOK, errMigrationTaskCanceled};
return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}

// Iteration is out of range
Expand All @@ -366,7 +366,7 @@ Status SlotMigrator::sendSnapshotByCmd() {
}

// Get user key
auto [_, user_key] = ExtractNamespaceKey(iter->key(), true);
auto [_, user_key] = ExtractNamespaceKey(iter->key(), /*slot_id_encoded=*/true);

// Add key's constructed commands to restore_cmds, send pipeline or not according to task's max_pipeline_size
auto result = migrateOneKey(user_key, iter->value(), &restore_cmds);
Expand Down Expand Up @@ -429,7 +429,7 @@ Status SlotMigrator::syncWALByCmd() {

Status SlotMigrator::finishSuccessfulMigration() {
if (stop_migration_) {
return {Status::NotOK, errMigrationTaskCanceled};
return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}

// Set import status on the destination node to SUCCESS
Expand Down Expand Up @@ -723,6 +723,12 @@ StatusOr<KeyMigrationResult> SlotMigrator::migrateOneKey(const rocksdb::Slice &k
}
break;
}
case kRedisHyperLogLog: {
// HyperLogLog migration by cmd is not supported,
// since it's hard to restore the same key structure for HyperLogLog
// commands.
break;
}
default:
break;
}
Expand Down Expand Up @@ -752,7 +758,17 @@ Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const Metadata

Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata &metadata, std::string *restore_cmds) {
std::string cmd;
cmd = type_to_cmd[metadata.Type()];
{
auto iter = type_to_cmd.find(metadata.Type());
if (iter != type_to_cmd.end()) {
cmd = iter->second;
} else {
if (metadata.Type() > RedisTypeNames.size()) {
return {Status::NotOK, "unknown key type: " + std::to_string(metadata.Type())};
}
return {Status::NotOK, "unsupported complex key type: " + RedisTypeNames[metadata.Type()]};
}
}

std::vector<std::string> user_cmd = {cmd, key.ToString()};
// Construct key prefix to iterate values of the complex type user key
Expand All @@ -769,7 +785,7 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata

for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) {
if (stop_migration_) {
return {Status::NotOK, errMigrationTaskCanceled};
return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}

if (!iter->key().starts_with(prefix_subkey)) {
Expand Down Expand Up @@ -811,6 +827,9 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
user_cmd.emplace_back(iter->value().ToString());
break;
}
case kRedisHyperLogLog: {
break;
}
default:
break;
}
Expand Down Expand Up @@ -878,7 +897,7 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad

for (iter->Seek(prefix_key); iter->Valid(); iter->Next()) {
if (stop_migration_) {
return {Status::NotOK, errMigrationTaskCanceled};
return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}

if (!iter->key().starts_with(prefix_key)) {
Expand Down Expand Up @@ -964,7 +983,7 @@ Status SlotMigrator::migrateBitmapKey(const InternalKey &inkey, std::unique_ptr<

Status SlotMigrator::sendCmdsPipelineIfNeed(std::string *commands, bool need) {
if (stop_migration_) {
return {Status::NotOK, errMigrationTaskCanceled};
return {Status::NotOK, std::string(errMigrationTaskCanceled)};
}

// Check pipeline
Expand Down
29 changes: 18 additions & 11 deletions src/cluster/slot_migrate.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,25 @@
#include <vector>

#include "batch_sender.h"
#include "config.h"
#include "encoding.h"
#include "parse_util.h"
#include "redis_slot.h"
#include "server/server.h"
#include "slot_import.h"
#include "stats/stats.h"
#include "status.h"
#include "storage/redis_db.h"
#include "unique_fd.h"

enum class MigrationType { kRedisCommand = 0, kRawKeyValue };
enum class MigrationType {
/// Use Redis commands to migrate data.
/// It will trying to extract commands from existing data and log, then replay
/// them on the destination node.
kRedisCommand = 0,
/// Using raw key-value and "APPLYBATCH" command in kvrocks to migrate data.
///
/// If downstream is not compatible with raw key-value, this migration type will
/// auto switch to kRedisCommand.
kRawKeyValue
};

enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed };

Expand Down Expand Up @@ -111,7 +118,7 @@ class SlotMigrator : public redis::Database {
private:
void loop();
void runMigrationProcess();
bool isTerminated() { return thread_state_ == ThreadState::Terminated; }
bool isTerminated() const { return thread_state_ == ThreadState::Terminated; }
Status startMigration();
Status sendSnapshot();
Status syncWAL();
Expand Down Expand Up @@ -158,11 +165,11 @@ class SlotMigrator : public redis::Database {
enum class ParserState { ArrayLen, BulkLen, BulkData, ArrayData, OneRspEnd };
enum class ThreadState { Uninitialized, Running, Terminated };

static const int kDefaultMaxPipelineSize = 16;
static const int kDefaultMaxMigrationSpeed = 4096;
static const int kDefaultSequenceGapLimit = 10000;
static const int kMaxItemsInCommand = 16; // number of items in every write command of complex keys
static const int kMaxLoopTimes = 10;
static constexpr int kDefaultMaxPipelineSize = 16;
static constexpr int kDefaultMaxMigrationSpeed = 4096;
static constexpr int kDefaultSequenceGapLimit = 10000;
static constexpr int kMaxItemsInCommand = 16; // number of items in every write command of complex keys
static constexpr int kMaxLoopTimes = 10;

Server *srv_;

Expand All @@ -183,7 +190,7 @@ class SlotMigrator : public redis::Database {
std::thread t_;
std::mutex job_mutex_;
std::condition_variable job_cv_;
std::unique_ptr<SlotMigrationJob> migration_job_;
std::unique_ptr<SlotMigrationJob> migration_job_; // GUARDED_BY(job_mutex_)

std::string dst_node_;
std::string dst_ip_;
Expand Down
2 changes: 1 addition & 1 deletion src/stats/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ enum StatsMetricFlags {
STATS_METRIC_COUNT
};

const int STATS_METRIC_SAMPLES = 16; // Number of samples per metric
constexpr int STATS_METRIC_SAMPLES = 16; // Number of samples per metric

struct CommandStat {
std::atomic<uint64_t> calls;
Expand Down

0 comments on commit bf56a05

Please sign in to comment.