diff --git a/kvrocks.conf b/kvrocks.conf index defd99854bc..c747fde2edd 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -536,6 +536,16 @@ compaction-checker-range 0-7 # rename-command KEYS "" ################################ MIGRATE ##################################### +# Slot migration supports two ways: +# - redis-command: Migrate data by redis serialization protocol(RESP). +# - raw-key-value: Migrate the raw key value data of the storage engine directly. +# This way eliminates the overhead of converting to the redis +# command, reduces resource consumption, improves migration +# efficiency, and can implement a finer rate limit. +# +# Default: redis-command +migrate-type redis-command + # If the network bandwidth is completely consumed by the migration task, # it will affect the availability of kvrocks. To avoid this situation, # migrate-speed is adopted to limit the migrating speed. @@ -562,6 +572,18 @@ migrate-pipeline-size 16 # Default: 10000 migrate-sequence-gap 10000 +# The raw-key-value migration way uses batch for migration. This option sets the batch size +# for each migration. +# +# Default: 16kb +migrate-batch-size-kb 16 + +# Rate limit for migration based on raw-key-value, representing the maximum number of data +# that can be migrated per second. 0 means no limit. +# +# Default: 16M +migrate-batch-rate-limit-mb 16 + ################################ ROCKSDB ##################################### # Specify the capacity of column family block cache. A larger block cache diff --git a/src/cluster/batch_sender.cc b/src/cluster/batch_sender.cc new file mode 100644 index 00000000000..e92221ee6f3 --- /dev/null +++ b/src/cluster/batch_sender.cc @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "batch_sender.h" + +#include "io_util.h" +#include "server/redis_reply.h" +#include "time_util.h" + +Status BatchSender::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value) { + // If the data is too large to fit in one batch, it needs to be split into multiple batches. + // To cover this case, we append the log data when first add metadata. + if (pending_entries_ == 0 && !prefix_logdata_.empty()) { + auto s = PutLogData(prefix_logdata_); + if (!s.IsOK()) { + return s; + } + } + auto s = write_batch_.Put(cf, key, value); + if (!s.ok()) { + return {Status::NotOK, fmt::format("failed to put key value to migration batch, {}", s.ToString())}; + } + + pending_entries_++; + entries_num_++; + return Status::OK(); +} + +Status BatchSender::Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key) { + auto s = write_batch_.Delete(cf, key); + if (!s.ok()) { + return {Status::NotOK, fmt::format("failed to delete key from migration batch, {}", s.ToString())}; + } + pending_entries_++; + entries_num_++; + return Status::OK(); +} + +Status BatchSender::PutLogData(const rocksdb::Slice &blob) { + auto s = write_batch_.PutLogData(blob); + if (!s.ok()) { + return {Status::NotOK, fmt::format("failed to put log data to migration batch, {}", s.ToString())}; + } + pending_entries_++; + entries_num_++; + return Status::OK(); +} + +void BatchSender::SetPrefixLogData(const std::string &prefix_logdata) { prefix_logdata_ = prefix_logdata; } + +Status BatchSender::Send() { + if (pending_entries_ == 0) { + return Status::OK(); + } + + // rate limit + if (bytes_per_sec_ > 0) { + auto single_burst = rate_limiter_->GetSingleBurstBytes(); + auto left = static_cast(write_batch_.GetDataSize()); + while (left > 0) { + auto request_size = std::min(left, single_burst); + rate_limiter_->Request(request_size, rocksdb::Env::IOPriority::IO_HIGH, nullptr); + left -= request_size; + } + } + + auto s = sendApplyBatchCmd(dst_fd_, write_batch_); + if (!s.IsOK()) { + return s.Prefixed("failed to send APPLYBATCH command"); + } + + sent_bytes_ += write_batch_.GetDataSize(); + sent_batches_num_++; + pending_entries_ = 0; + write_batch_.Clear(); + return Status::OK(); +} + +Status BatchSender::sendApplyBatchCmd(int fd, const rocksdb::WriteBatch &write_batch) { + if (fd <= 0) { + return {Status::NotOK, "invalid fd"}; + } + + GET_OR_RET(util::SockSend(fd, redis::ArrayOfBulkStrings({"APPLYBATCH", write_batch.Data()}))); + + std::string line = GET_OR_RET(util::SockReadLine(fd)); + + if (line.compare(0, 1, "-") == 0) { + return {Status::NotOK, line}; + } + + return Status::OK(); +} + +void BatchSender::SetBytesPerSecond(size_t bytes_per_sec) { + if (bytes_per_sec_ == bytes_per_sec) { + return; + } + bytes_per_sec_ = bytes_per_sec; + if (bytes_per_sec > 0) { + rate_limiter_->SetBytesPerSecond(static_cast(bytes_per_sec)); + } +} + +double BatchSender::GetRate(uint64_t since) const { + auto t = util::GetTimeStampMS(); + if (t <= since) { + return 0; + } + + return ((static_cast(sent_bytes_) / 1024.0) / (static_cast(t - since) / 1000.0)); +} diff --git a/src/cluster/batch_sender.h b/src/cluster/batch_sender.h new file mode 100644 index 00000000000..41f46d1f2f7 --- /dev/null +++ b/src/cluster/batch_sender.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include +#include + +#include "status.h" + +class BatchSender { + public: + BatchSender() = default; + BatchSender(int fd, size_t max_bytes, size_t bytes_per_sec) + : dst_fd_(fd), + max_bytes_(max_bytes), + bytes_per_sec_(bytes_per_sec), + rate_limiter_(std::unique_ptr( + rocksdb::NewGenericRateLimiter(static_cast(bytes_per_sec_)))) {} + + ~BatchSender() = default; + + Status Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value); + Status Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key); + Status PutLogData(const rocksdb::Slice &blob); + void SetPrefixLogData(const std::string &prefix_logdata); + Status Send(); + + void SetMaxBytes(size_t max_bytes) { + if (max_bytes_ != max_bytes) max_bytes_ = max_bytes; + } + bool IsFull() const { return write_batch_.GetDataSize() >= max_bytes_; } + uint64_t GetSentBytes() const { return sent_bytes_; } + uint32_t GetSentBatchesNum() const { return sent_batches_num_; } + uint32_t GetEntriesNum() const { return entries_num_; } + void SetBytesPerSecond(size_t bytes_per_sec); + double GetRate(uint64_t since) const; + + private: + static Status sendApplyBatchCmd(int fd, const rocksdb::WriteBatch &write_batch); + + rocksdb::WriteBatch write_batch_{}; + std::string prefix_logdata_{}; + uint64_t sent_bytes_ = 0; + uint32_t sent_batches_num_ = 0; + uint32_t entries_num_ = 0; + uint32_t pending_entries_ = 0; + + int dst_fd_; + size_t max_bytes_; + + size_t bytes_per_sec_ = 0; // 0 means no limit + std::unique_ptr rate_limiter_; +}; diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index be54cc2e9e2..eba2f901330 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -28,6 +28,7 @@ #include "fmt/format.h" #include "io_util.h" #include "storage/batch_extractor.h" +#include "storage/iterator.h" #include "sync_migrate_context.h" #include "thread_util.h" #include "time_util.h" @@ -36,14 +37,21 @@ const char *errFailedToSendCommands = "failed to send commands to restore a key"; const char *errMigrationTaskCanceled = "key migration stopped due to a task cancellation"; const char *errFailedToSetImportStatus = "failed to set import status on destination node"; +const char *errUnsupportedMigrationType = "unsupported migration type"; static std::map type_to_cmd = { {kRedisString, "set"}, {kRedisList, "rpush"}, {kRedisHash, "hmset"}, {kRedisSet, "sadd"}, {kRedisZSet, "zadd"}, {kRedisBitmap, "setbit"}, {kRedisSortedint, "siadd"}, {kRedisStream, "xadd"}, }; -SlotMigrator::SlotMigrator(Server *srv, int max_migration_speed, int max_pipeline_size, int seq_gap_limit) - : Database(srv->storage, kDefaultNamespace), srv_(srv) { +SlotMigrator::SlotMigrator(Server *srv) + : Database(srv->storage, kDefaultNamespace), + srv_(srv), + max_migration_speed_(srv->GetConfig()->migrate_speed), + max_pipeline_size_(srv->GetConfig()->pipeline_size), + seq_gap_limit_(srv->GetConfig()->sequence_gap), + migrate_batch_bytes_per_sec_(srv->GetConfig()->migrate_batch_rate_limit_mb * MiB), + migrate_batch_size_bytes_(srv->GetConfig()->migrate_batch_size_kb * KiB) { // Let metadata_cf_handle_ be nullptr, and get them in real time to avoid accessing invalid pointer, // because metadata_cf_handle_ and db_ will be destroyed if DB is reopened. // [Situation]: @@ -61,16 +69,6 @@ SlotMigrator::SlotMigrator(Server *srv, int max_migration_speed, int max_pipelin // This problem may exist in all functions of Database called in slot migration process. metadata_cf_handle_ = nullptr; - if (max_migration_speed >= 0) { - max_migration_speed_ = max_migration_speed; - } - if (max_pipeline_size > 0) { - max_pipeline_size_ = max_pipeline_size; - } - if (seq_gap_limit > 0) { - seq_gap_limit_ = seq_gap_limit; - } - if (srv->IsSlave()) { SetStopMigrationFlag(true); } @@ -210,7 +208,7 @@ void SlotMigrator::runMigrationProcess() { break; } case SlotMigrationStage::kWAL: { - auto s = syncWal(); + auto s = syncWAL(); if (s.IsOK()) { LOG(INFO) << "[migrate] Succeed to sync from WAL for a slot " << migrating_slot_; current_stage_ = SlotMigrationStage::kSuccess; @@ -298,6 +296,24 @@ Status SlotMigrator::startMigration() { } Status SlotMigrator::sendSnapshot() { + if (srv_->GetConfig()->migrate_type == MigrationType::kRedisCommand) { + return sendSnapshotByCmd(); + } else if (srv_->GetConfig()->migrate_type == MigrationType::kRawKeyValue) { + return sendSnapshotByRawKV(); + } + return {Status::NotOK, errUnsupportedMigrationType}; +} + +Status SlotMigrator::syncWAL() { + if (srv_->GetConfig()->migrate_type == MigrationType::kRedisCommand) { + return syncWALByCmd(); + } else if (srv_->GetConfig()->migrate_type == MigrationType::kRawKeyValue) { + return syncWALByRawKV(); + } + return {Status::NotOK, errUnsupportedMigrationType}; +} + +Status SlotMigrator::sendSnapshotByCmd() { uint64_t migrated_key_cnt = 0; uint64_t expired_key_cnt = 0; uint64_t empty_key_cnt = 0; @@ -365,7 +381,7 @@ Status SlotMigrator::sendSnapshot() { return Status::OK(); } -Status SlotMigrator::syncWal() { +Status SlotMigrator::syncWALByCmd() { // Send incremental data from WAL circularly until new increment less than a certain amount auto s = syncWalBeforeForbiddingSlot(); if (!s.IsOK()) { @@ -1130,3 +1146,168 @@ void SlotMigrator::resumeSyncCtx(const Status &migrate_result) { blocking_context_ = nullptr; } } + +Status SlotMigrator::sendMigrationBatch(BatchSender *batch) { + // user may dynamically change some configs, apply it when send data + batch->SetMaxBytes(migrate_batch_size_bytes_); + batch->SetBytesPerSecond(migrate_batch_bytes_per_sec_); + return batch->Send(); +} + +Status SlotMigrator::sendSnapshotByRawKV() { + uint64_t start_ts = util::GetTimeStampMS(); + LOG(INFO) << "[migrate] Migrating snapshot of slot " << migrating_slot_ << " by raw key value"; + + rocksdb::ReadOptions read_options = storage_->DefaultScanOptions(); + read_options.snapshot = slot_snapshot_; + engine::DBIterator iter(storage_, read_options); + auto prefix = ComposeSlotKeyPrefix(namespace_, migrating_slot_); + + BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, migrate_batch_bytes_per_sec_); + + for (iter.Seek(prefix); iter.Valid() && iter.Key().starts_with(prefix); iter.Next()) { + auto redis_type = iter.Type(); + std::string log_data; + if (redis_type == RedisType::kRedisList) { + redis::WriteBatchLogData batch_log_data(redis_type, {std::to_string(RedisCommand::kRedisCmdRPush)}); + log_data = batch_log_data.Encode(); + } else { + redis::WriteBatchLogData batch_log_data(redis_type); + log_data = batch_log_data.Encode(); + } + batch_sender.SetPrefixLogData(log_data); + + GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(engine::kMetadataColumnFamilyName), iter.Key(), iter.Value())); + + auto subkey_iter = iter.GetSubKeyIterator(); + if (!subkey_iter) { + continue; + } + + for (subkey_iter->Seek(); subkey_iter->Valid(); subkey_iter->Next()) { + GET_OR_RET(batch_sender.Put(subkey_iter->ColumnFamilyHandle(), subkey_iter->Key(), subkey_iter->Value())); + + if (redis_type == RedisType::kRedisZSet) { + InternalKey internal_key(subkey_iter->Key(), storage_->IsSlotIdEncoded()); + auto score_key = subkey_iter->Value().ToString(); + score_key.append(subkey_iter->UserKey().ToString()); + auto score_key_bytes = + InternalKey(iter.Key(), score_key, internal_key.GetVersion(), storage_->IsSlotIdEncoded()).Encode(); + GET_OR_RET(batch_sender.Put(storage_->GetCFHandle(kColumnFamilyIDZSetScore), score_key_bytes, Slice())); + } + + if (batch_sender.IsFull()) { + GET_OR_RET(sendMigrationBatch(&batch_sender)); + } + } + + if (batch_sender.IsFull()) { + GET_OR_RET(sendMigrationBatch(&batch_sender)); + } + } + + GET_OR_RET(sendMigrationBatch(&batch_sender)); + + auto elapsed = util::GetTimeStampMS() - start_ts; + LOG(INFO) << fmt::format( + "[migrate] Succeed to migrate snapshot, slot: {}, elapsed: {} ms, " + "sent: {} bytes, rate: {:.2f} kb/s, batches: {}, entries: {}", + migrating_slot_.load(), elapsed, batch_sender.GetSentBytes(), batch_sender.GetRate(start_ts), + batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum()); + + return Status::OK(); +} + +Status SlotMigrator::syncWALByRawKV() { + uint64_t start_ts = util::GetTimeStampMS(); + LOG(INFO) << "[migrate] Syncing WAL of slot " << migrating_slot_ << " by raw key value"; + BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, migrate_batch_bytes_per_sec_); + + int epoch = 1; + uint64_t wal_incremental_seq = 0; + + while (epoch <= kMaxLoopTimes) { + if (catchUpIncrementalWAL()) { + break; + } + wal_incremental_seq = storage_->GetDB()->GetLatestSequenceNumber(); + auto s = migrateIncrementalDataByRawKV(wal_incremental_seq, &batch_sender); + if (!s.IsOK()) { + return {Status::NotOK, fmt::format("migrate incremental data failed, {}", s.Msg())}; + } + LOG(INFO) << fmt::format("[migrate] Migrated incremental data, epoch: {}, seq from {} to {}", epoch, wal_begin_seq_, + wal_incremental_seq); + wal_begin_seq_ = wal_incremental_seq; + epoch++; + } + + setForbiddenSlot(migrating_slot_); + + wal_incremental_seq = storage_->GetDB()->GetLatestSequenceNumber(); + if (wal_incremental_seq > wal_begin_seq_) { + auto s = migrateIncrementalDataByRawKV(wal_incremental_seq, &batch_sender); + if (!s.IsOK()) { + return {Status::NotOK, fmt::format("migrate last incremental data failed, {}", s.Msg())}; + } + LOG(INFO) << fmt::format("[migrate] Migrated last incremental data after set forbidden slot, seq from {} to {}", + wal_begin_seq_, wal_incremental_seq); + } + + auto elapsed = util::GetTimeStampMS() - start_ts; + LOG(INFO) << fmt::format( + "[migrate] Succeed to migrate incremental data, slot: {}, elapsed: {} ms, " + "sent: {} bytes, rate: {:.2f} kb/s, batches: {}, entries: {}", + migrating_slot_.load(), elapsed, batch_sender.GetSentBytes(), batch_sender.GetRate(start_ts), + batch_sender.GetSentBatchesNum(), batch_sender.GetEntriesNum()); + + return Status::OK(); +} + +bool SlotMigrator::catchUpIncrementalWAL() { + uint64_t gap = storage_->GetDB()->GetLatestSequenceNumber() - wal_begin_seq_; + if (gap <= seq_gap_limit_) { + LOG(INFO) << fmt::format("[migrate] Incremental data sequence gap: {}, less than limit: {}, set forbidden slot: {}", + gap, seq_gap_limit_, migrating_slot_.load()); + return true; + } + return false; +} + +Status SlotMigrator::migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender *batch_sender) { + engine::WALIterator wal_iter(storage_, migrating_slot_); + uint64_t start_seq = wal_begin_seq_ + 1; + for (wal_iter.Seek(start_seq); wal_iter.Valid(); wal_iter.Next()) { + if (wal_iter.NextSequenceNumber() > end_seq + 1) { + break; + } + auto item = wal_iter.Item(); + switch (item.type) { + case engine::WALItem::Type::kTypeLogData: { + GET_OR_RET(batch_sender->PutLogData(item.key)); + break; + } + case engine::WALItem::Type::kTypePut: { + GET_OR_RET(batch_sender->Put(storage_->GetCFHandle(static_cast(item.column_family_id)), + item.key, item.value)); + break; + } + case engine::WALItem::Type::kTypeDelete: { + GET_OR_RET( + batch_sender->Delete(storage_->GetCFHandle(static_cast(item.column_family_id)), item.key)); + break; + } + case engine::WALItem::Type::kTypeDeleteRange: { + // Do nothing in DeleteRange due to it might cross multiple slots. It's only used in + // FLUSHDB/FLUSHALL commands for now and maybe we can disable them while migrating. + } + default: + break; + } + if (batch_sender->IsFull()) { + GET_OR_RET(sendMigrationBatch(batch_sender)); + } + } + + // send the remaining data + return sendMigrationBatch(batch_sender); +} diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h index 9aad1dcc5ac..8fefdbc9305 100644 --- a/src/cluster/slot_migrate.h +++ b/src/cluster/slot_migrate.h @@ -34,6 +34,7 @@ #include #include +#include "batch_sender.h" #include "config.h" #include "encoding.h" #include "parse_util.h" @@ -45,6 +46,8 @@ #include "storage/redis_db.h" #include "unique_fd.h" +enum class MigrationType { kRedisCommand = 0, kRawKeyValue }; + enum class MigrationState { kNone = 0, kStarted, kSuccess, kFailed }; enum class SlotMigrationStage { kNone, kStart, kSnapshot, kWAL, kSuccess, kFailed, kClean }; @@ -75,8 +78,7 @@ class SyncMigrateContext; class SlotMigrator : public redis::Database { public: - explicit SlotMigrator(Server *srv, int max_migration_speed = kDefaultMaxMigrationSpeed, - int max_pipeline_size = kDefaultMaxPipelineSize, int seq_gap_limit = kDefaultSequenceGapLimit); + explicit SlotMigrator(Server *srv); SlotMigrator(const SlotMigrator &other) = delete; SlotMigrator &operator=(const SlotMigrator &other) = delete; ~SlotMigrator(); @@ -94,6 +96,8 @@ class SlotMigrator : public redis::Database { void SetSequenceGapLimit(int value) { if (value > 0) seq_gap_limit_ = value; } + void SetMigrateBatchRateLimit(size_t bytes_per_sec) { migrate_batch_bytes_per_sec_ = bytes_per_sec; } + void SetMigrateBatchSize(size_t size) { migrate_batch_size_bytes_ = size; } void SetStopMigrationFlag(bool value) { stop_migration_ = value; } bool IsMigrationInProgress() const { return migration_state_ == MigrationState::kStarted; } SlotMigrationStage GetCurrentSlotMigrationStage() const { return current_stage_; } @@ -108,13 +112,16 @@ class SlotMigrator : public redis::Database { bool isTerminated() { return thread_state_ == ThreadState::Terminated; } Status startMigration(); Status sendSnapshot(); - Status syncWal(); + Status syncWAL(); Status finishSuccessfulMigration(); Status finishFailedMigration(); void clean(); Status authOnDstNode(int sock_fd, const std::string &password); Status setImportStatusOnDstNode(int sock_fd, int status); + + Status sendSnapshotByCmd(); + Status syncWALByCmd(); Status checkSingleResponse(int sock_fd); Status checkMultipleResponses(int sock_fd, int total); @@ -133,6 +140,13 @@ class SlotMigrator : public redis::Database { Status migrateIncrementData(std::unique_ptr *iter, uint64_t end_seq); Status syncWalBeforeForbiddingSlot(); Status syncWalAfterForbiddingSlot(); + + Status sendMigrationBatch(BatchSender *batch); + Status sendSnapshotByRawKV(); + Status syncWALByRawKV(); + bool catchUpIncrementalWAL(); + Status migrateIncrementalDataByRawKV(uint64_t end_seq, BatchSender *batch_sender); + void setForbiddenSlot(int16_t slot); std::unique_lock blockingLock() { return std::unique_lock(blocking_mutex_); } @@ -148,9 +162,12 @@ class SlotMigrator : public redis::Database { static const int kMaxLoopTimes = 10; Server *srv_; - int max_migration_speed_; - int max_pipeline_size_; - int seq_gap_limit_; + + int max_migration_speed_ = kDefaultMaxMigrationSpeed; + int max_pipeline_size_ = kDefaultMaxPipelineSize; + uint64_t seq_gap_limit_ = kDefaultSequenceGapLimit; + std::atomic migrate_batch_bytes_per_sec_ = 1 * GiB; + std::atomic migrate_batch_size_bytes_; SlotMigrationStage current_stage_ = SlotMigrationStage::kNone; ParserState parser_state_ = ParserState::ArrayLen; diff --git a/src/config/config.cc b/src/config/config.cc index 7a944f6c4ad..db7ee712a99 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -83,6 +83,9 @@ const std::vector> cache_types{[] { return res; }()}; +const std::vector> migration_types{{"redis-command", MigrationType::kRedisCommand}, + {"raw-key-value", MigrationType::kRawKeyValue}}; + std::string TrimRocksDbPrefix(std::string s) { if (strncasecmp(s.data(), "rocksdb.", 8) != 0) return s; return s.substr(8, s.size() - 8); @@ -168,6 +171,10 @@ Config::Config() { {"migrate-speed", false, new IntField(&migrate_speed, 4096, 0, INT_MAX)}, {"migrate-pipeline-size", false, new IntField(&pipeline_size, 16, 1, INT_MAX)}, {"migrate-sequence-gap", false, new IntField(&sequence_gap, 10000, 1, INT_MAX)}, + {"migrate-type", false, + new EnumField(&migrate_type, migration_types, MigrationType::kRedisCommand)}, + {"migrate-batch-size-kb", false, new IntField(&migrate_batch_size_kb, 16, 1, INT_MAX)}, + {"migrate-batch-rate-limit-mb", false, new IntField(&migrate_batch_rate_limit_mb, 16, 0, INT_MAX)}, {"unixsocket", true, new StringField(&unixsocket, "")}, {"unixsocketperm", true, new OctalField(&unixsocketperm, 0777, 1, INT_MAX)}, {"log-retention-days", false, new IntField(&log_retention_days, -1, -1, INT_MAX)}, @@ -508,6 +515,18 @@ void Config::initFieldCallback() { if (cluster_enabled) srv->slot_migrator->SetSequenceGapLimit(sequence_gap); return Status::OK(); }}, + {"migrate-batch-rate-limit-mb", + [this](Server *srv, const std::string &k, const std::string &v) -> Status { + if (!srv) return Status::OK(); + srv->slot_migrator->SetMigrateBatchRateLimit(migrate_batch_rate_limit_mb * MiB); + return Status::OK(); + }}, + {"migrate-batch-size-kb", + [this](Server *srv, const std::string &k, const std::string &v) -> Status { + if (!srv) return Status::OK(); + srv->slot_migrator->SetMigrateBatchSize(migrate_batch_size_kb * KiB); + return Status::OK(); + }}, {"log-level", [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); diff --git a/src/config/config.h b/src/config/config.h index e69ad5e580f..a24c59e03fe 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -36,6 +36,7 @@ // forward declaration class Server; +enum class MigrationType; namespace engine { class Storage; } @@ -144,9 +145,13 @@ struct Config { bool persist_cluster_nodes_enabled = true; bool slot_id_encoded = false; bool cluster_enabled = false; + int migrate_speed; int pipeline_size; int sequence_gap; + MigrationType migrate_type; + int migrate_batch_size_kb; + int migrate_batch_rate_limit_mb; bool redis_cursor_compatible = false; bool resp3_enabled = false; diff --git a/src/server/server.cc b/src/server/server.cc index bc4c29391f8..4d67fc642c6 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -160,8 +160,7 @@ Status Server::Start() { } } // Create objects used for slot migration - slot_migrator = - std::make_unique(this, config_->migrate_speed, config_->pipeline_size, config_->sequence_gap); + slot_migrator = std::make_unique(this); auto s = slot_migrator->CreateMigrationThread(); if (!s.IsOK()) { return s.Prefixed("failed to create migration thread"); diff --git a/src/storage/storage.cc b/src/storage/storage.cc index 0408c51ca7e..9d74d181216 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -733,6 +733,8 @@ rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(const std::string &name) { return cf_handles_[0]; } +rocksdb::ColumnFamilyHandle *Storage::GetCFHandle(ColumnFamilyID id) { return cf_handles_[static_cast(id)]; } + rocksdb::Status Storage::Compact(rocksdb::ColumnFamilyHandle *cf, const Slice *begin, const Slice *end) { rocksdb::CompactRangeOptions compact_opts; compact_opts.change_level = true; diff --git a/src/storage/storage.h b/src/storage/storage.h index 7e0e94d2fc1..0e20425a68d 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -51,7 +51,7 @@ inline constexpr StorageEngineType STORAGE_ENGINE_TYPE = StorageEngineType::KVRO const int kReplIdLength = 16; enum ColumnFamilyID { - kColumnFamilyIDDefault, + kColumnFamilyIDDefault = 0, kColumnFamilyIDMetadata, kColumnFamilyIDZSetScore, kColumnFamilyIDPubSub, @@ -177,6 +177,7 @@ class Storage { bool IsClosing() const { return db_closing_; } std::string GetName() const { return config_->db_name; } rocksdb::ColumnFamilyHandle *GetCFHandle(const std::string &name); + rocksdb::ColumnFamilyHandle *GetCFHandle(ColumnFamilyID id); std::vector *GetCFHandles() { return &cf_handles_; } LockManager *GetLockManager() { return &lock_mgr_; } void PurgeOldBackups(uint32_t num_backups_to_keep, uint32_t backup_max_keep_hours); diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index afae86d1869..6901f5f2d08 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -35,6 +35,7 @@ import ( type SlotMigrationState string type SlotImportState string +type SlotMigrationType string const ( SlotMigrationStateStarted SlotMigrationState = "start" @@ -43,8 +44,13 @@ const ( SlotImportStateSuccess SlotImportState = "success" SlotImportStateFailed SlotImportState = "error" + + MigrationTypeRedisCommand SlotMigrationType = "redis-command" + MigrationTypeRawKeyValue SlotMigrationType = "raw-key-value" ) +var testSlot = 0 + func TestSlotMigrateFromSlave(t *testing.T) { ctx := context.Background() @@ -530,10 +536,13 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[slot]).Val()) }) - migrateAllTypes := func(t *testing.T, sync bool, slot int) { + migrateAllTypes := func(t *testing.T, migrateType SlotMigrationType, sync bool) { + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-type", string(migrateType)).Err()) + + testSlot += 1 keys := make(map[string]string, 0) for _, typ := range []string{"string", "expired_string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} { - keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[slot]) + keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[testSlot]) require.NoError(t, rdb0.Del(ctx, keys[typ]).Err()) } // type string @@ -604,12 +613,11 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID) require.EqualValues(t, 19, streamInfo.Length) - // migrate slot 1, all keys above are belong to slot 1 if !sync { - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) - waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) + waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess) } else { - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1, "sync").Val()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1, "sync").Val()) } // check destination data @@ -657,19 +665,12 @@ func TestSlotMigrateDataType(t *testing.T) { } } - t.Run("MIGRATE - Slot migrate all types of existing data", func(t *testing.T) { - migrateAllTypes(t, false, 1) - }) - - t.Run("MIGRATE - Slot migrate all types of existing data (sync)", func(t *testing.T) { - migrateAllTypes(t, true, 2) - }) - - t.Run("MIGRATE - increment sync stream from WAL", func(t *testing.T) { - slot := 40 + migrateIncrementalStream := func(t *testing.T, migrateType SlotMigrationType) { + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-type", string(migrateType)).Err()) + testSlot += 1 keys := make(map[string]string, 0) for _, typ := range []string{"stream"} { - keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[slot]) + keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[testSlot]) require.NoError(t, rdb0.Del(ctx, keys[typ]).Err()) } for i := 1; i < 1000; i++ { @@ -691,7 +692,7 @@ func TestSlotMigrateDataType(t *testing.T) { defer func() { require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "4096").Err()) }() - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) newStreamID := "1001" require.NoError(t, rdb0.XAdd(ctx, &redis.XAddArgs{ Stream: keys["stream"], @@ -700,18 +701,20 @@ func TestSlotMigrateDataType(t *testing.T) { }).Err()) require.NoError(t, rdb0.XDel(ctx, keys["stream"], "1-0").Err()) require.NoError(t, rdb0.Do(ctx, "XSETID", keys["stream"], "1001-0", "MAXDELETEDID", "2-0").Err()) - waitForMigrateStateInDuration(t, rdb0, slot, SlotMigrationStateSuccess, time.Minute) + waitForMigrateStateInDuration(t, rdb0, testSlot, SlotMigrationStateSuccess, time.Minute) streamInfo = rdb1.XInfoStream(ctx, keys["stream"]).Val() require.EqualValues(t, "1001-0", streamInfo.LastGeneratedID) require.EqualValues(t, 1000, streamInfo.EntriesAdded) require.EqualValues(t, "2-0", streamInfo.MaxDeletedEntryID) require.EqualValues(t, 999, streamInfo.Length) - }) + } - t.Run("MIGRATE - Migrating empty stream", func(t *testing.T) { - slot := 31 - key := fmt.Sprintf("stream_{%s}", util.SlotTable[slot]) + migrateEmptyStream := func(t *testing.T, migrateType SlotMigrationType) { + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-type", string(migrateType)).Err()) + + testSlot += 1 + key := fmt.Sprintf("stream_{%s}", util.SlotTable[testSlot]) require.NoError(t, rdb0.Del(ctx, key).Err()) @@ -736,8 +739,8 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, "7-0", originRes.MaxDeletedEntryID) require.EqualValues(t, 0, originRes.Length) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) - waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) + waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess) require.ErrorContains(t, rdb0.Exists(ctx, key).Err(), "MOVED") @@ -748,11 +751,13 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, originRes.EntriesAdded, migratedRes.EntriesAdded) require.EqualValues(t, originRes.MaxDeletedEntryID, migratedRes.MaxDeletedEntryID) require.EqualValues(t, originRes.Length, migratedRes.Length) - }) + } - t.Run("MIGRATE - Migrating stream with deleted entries", func(t *testing.T) { - slot := 32 - key := fmt.Sprintf("stream_{%s}", util.SlotTable[slot]) + migrateStreamWithDeletedEnties := func(t *testing.T, migrateType SlotMigrationType) { + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-type", string(migrateType)).Err()) + + testSlot += 1 + key := fmt.Sprintf("stream_{%s}", util.SlotTable[testSlot]) require.NoError(t, rdb0.Del(ctx, key).Err()) @@ -775,8 +780,8 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, "3-0", originRes.MaxDeletedEntryID) require.EqualValues(t, 3, originRes.Length) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) - waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) + waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess) require.ErrorContains(t, rdb0.Exists(ctx, key).Err(), "MOVED") @@ -787,74 +792,48 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, originRes.EntriesAdded, migratedRes.EntriesAdded) require.EqualValues(t, originRes.MaxDeletedEntryID, migratedRes.MaxDeletedEntryID) require.EqualValues(t, originRes.Length, migratedRes.Length) - }) - - t.Run("MIGRATE - Accessing slot is forbidden on source server but not on destination server", func(t *testing.T) { - slot := 3 - require.NoError(t, rdb0.Set(ctx, util.SlotTable[slot], 3, 0).Err()) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) - waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) - require.ErrorContains(t, rdb0.Set(ctx, util.SlotTable[slot], "source-value", 0).Err(), "MOVED") - require.ErrorContains(t, rdb0.Del(ctx, util.SlotTable[slot]).Err(), "MOVED") - require.ErrorContains(t, rdb0.Exists(ctx, util.SlotTable[slot]).Err(), "MOVED") - require.NoError(t, rdb1.Set(ctx, util.SlotTable[slot], "destination-value", 0).Err()) - }) - - t.Run("MIGRATE - Slot isn't forbidden writing when starting migrating", func(t *testing.T) { - slot := 5 - cnt := 20000 - for i := 0; i < cnt; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) - } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) - requireMigrateState(t, rdb0, slot, SlotMigrationStateStarted) - // write during migrating - require.EqualValues(t, cnt+1, rdb0.LPush(ctx, util.SlotTable[slot], cnt).Val()) - waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) - require.Equal(t, strconv.Itoa(cnt), rdb1.LPop(ctx, util.SlotTable[slot]).Val()) - }) - - t.Run("MIGRATE - Slot keys are not cleared after migration but cleared after setslot", func(t *testing.T) { - slot := 6 - require.NoError(t, rdb0.Set(ctx, util.SlotTable[slot], "slot6", 0).Err()) - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) - waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) - require.Equal(t, "slot6", rdb1.Get(ctx, util.SlotTable[slot]).Val()) - require.Contains(t, rdb0.Keys(ctx, "*").Val(), util.SlotTable[slot]) - require.NoError(t, rdb0.Do(ctx, "clusterx", "setslot", slot, "node", id1, "2").Err()) - require.NotContains(t, rdb0.Keys(ctx, "*").Val(), util.SlotTable[slot]) - }) + } - t.Run("MIGRATE - Migrate incremental data via parsing and filtering data in WAL", func(t *testing.T) { - migratingSlot := 15 + migrateIncrementalData := func(t *testing.T, migrateType SlotMigrationType) { + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-type", string(migrateType)).Err()) + testSlot += 1 + migratingSlot := testSlot + hashtag := util.SlotTable[migratingSlot] keys := []string{ // key for slowing migrate-speed when migrating existing data - util.SlotTable[migratingSlot], - // the following keys belong to slot 15; keys of all the data types (string/hash/set/zset/list/sortint) - "key:000042915392", - "key:000043146202", - "key:000044434182", - "key:000045189446", - "key:000047413016", - "key:000049190069", - "key:000049930003", - "key:000049980785", - "key:000056730838", + hashtag, + fmt.Sprintf("{%s}_key1", hashtag), + fmt.Sprintf("{%s}_key2", hashtag), + fmt.Sprintf("{%s}_key3", hashtag), + fmt.Sprintf("{%s}_key4", hashtag), + fmt.Sprintf("{%s}_key5", hashtag), + fmt.Sprintf("{%s}_key6", hashtag), + fmt.Sprintf("{%s}_key7", hashtag), + fmt.Sprintf("{%s}_key8", hashtag), + fmt.Sprintf("{%s}_key9", hashtag), } for _, key := range keys { require.NoError(t, rdb0.Del(ctx, key).Err()) } - require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "64").Err()) - require.Equal(t, map[string]string{"migrate-speed": "64"}, rdb0.ConfigGet(ctx, "migrate-speed").Val()) + + valuePrefix := "value" + if migrateType == MigrationTypeRedisCommand { + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "64").Err()) + } else { + // Create enough data + valuePrefix = strings.Repeat("value", 1024) + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-batch-rate-limit-mb", "1").Err()) + } cnt := 2000 for i := 0; i < cnt; i++ { - require.NoError(t, rdb0.LPush(ctx, keys[0], i).Err()) + require.NoError(t, rdb0.LPush(ctx, keys[0], fmt.Sprintf("%s-%d", valuePrefix, i)).Err()) } require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", migratingSlot, id1).Val()) // write key that doesn't belong to this slot - nonMigratingSlot := 12 + testSlot += 1 + nonMigratingSlot := testSlot require.NoError(t, rdb0.Del(ctx, util.SlotTable[nonMigratingSlot]).Err()) require.NoError(t, rdb0.Set(ctx, util.SlotTable[nonMigratingSlot], "non-migrating-value", 0).Err()) @@ -868,12 +847,14 @@ func TestSlotMigrateDataType(t *testing.T) { require.NoError(t, rdb0.SetBit(ctx, keys[3], 10086, 1).Err()) require.NoError(t, rdb0.Expire(ctx, keys[3], 10000*time.Second).Err()) // verify expireat binlog could be parsed - slotWithExpiringKey := nonMigratingSlot + 1 + testSlot += 1 + slotWithExpiringKey := testSlot require.NoError(t, rdb0.Del(ctx, util.SlotTable[slotWithExpiringKey]).Err()) require.NoError(t, rdb0.Set(ctx, util.SlotTable[slotWithExpiringKey], "expiring-value", 0).Err()) require.NoError(t, rdb0.ExpireAt(ctx, util.SlotTable[slotWithExpiringKey], time.Now().Add(100*time.Second)).Err()) // verify del command - slotWithDeletedKey := nonMigratingSlot + 2 + testSlot += 1 + slotWithDeletedKey := testSlot require.NoError(t, rdb0.Set(ctx, util.SlotTable[slotWithDeletedKey], "will-be-deleted", 0).Err()) require.NoError(t, rdb0.Del(ctx, util.SlotTable[slotWithDeletedKey]).Err()) // 2. type hash @@ -912,12 +893,6 @@ func TestSlotMigrateDataType(t *testing.T) { for i := 10000; i < 11000; i += 2 { require.NoError(t, rdb0.SetBit(ctx, keys[8], int64(i), 1).Err()) } - for i := 20000; i < 21000; i += 5 { - res := rdb0.BitField(ctx, keys[8], "SET", "u5", strconv.Itoa(i), 23) - require.NoError(t, res.Err()) - require.EqualValues(t, 1, len(res.Val())) - require.EqualValues(t, 0, res.Val()[0]) - } // 7. type sortint require.NoError(t, rdb0.Do(ctx, "SIADD", keys[9], 2, 4, 1, 3).Err()) require.NoError(t, rdb0.Do(ctx, "SIREM", keys[9], 2).Err()) @@ -963,12 +938,6 @@ func TestSlotMigrateDataType(t *testing.T) { for i := 0; i < 20; i += 2 { require.EqualValues(t, 0, rdb1.GetBit(ctx, keys[8], int64(i)).Val()) } - for i := 20000; i < 21000; i += 5 { - res := rdb1.BitField(ctx, keys[8], "GET", "u5", strconv.Itoa(i)) - require.NoError(t, res.Err()) - require.EqualValues(t, 1, len(res.Val())) - require.EqualValues(t, 23, res.Val()[0]) - } // 7. type sortint require.EqualValues(t, siv, rdb1.Do(ctx, "SIRANGE", keys[9], 0, -1).Val()) @@ -976,47 +945,112 @@ func TestSlotMigrateDataType(t *testing.T) { require.Equal(t, "non-migrating-value", rdb0.Get(ctx, util.SlotTable[nonMigratingSlot]).Val()) require.ErrorContains(t, rdb1.Exists(ctx, util.SlotTable[nonMigratingSlot]).Err(), "MOVED") require.EqualValues(t, 0, rdb0.Exists(ctx, util.SlotTable[slotWithDeletedKey]).Val()) + } + + testMigrationTypes := []SlotMigrationType{MigrationTypeRedisCommand, MigrationTypeRawKeyValue} + + for _, testType := range testMigrationTypes { + t.Run(fmt.Sprintf("MIGRATE - Slot migrate all types of existing data using %s", testType), func(t *testing.T) { + migrateAllTypes(t, testType, false) + }) + + t.Run(fmt.Sprintf("MIGRATE - Slot migrate all types of existing data (sync) using %s", testType), func(t *testing.T) { + migrateAllTypes(t, testType, true) + }) + + t.Run(fmt.Sprintf("MIGRATE - increment sync stream from WAL using %s", testType), func(t *testing.T) { + migrateIncrementalStream(t, testType) + }) + + t.Run(fmt.Sprintf("MIGRATE - Migrating empty stream using %s", testType), func(t *testing.T) { + migrateEmptyStream(t, testType) + }) + + t.Run(fmt.Sprintf("MIGRATE - Migrating stream with deleted entries using %s", testType), func(t *testing.T) { + migrateStreamWithDeletedEnties(t, testType) + }) + + t.Run(fmt.Sprintf("MIGRATE - Migrate incremental data via parsing and filtering data in WAL using %s", testType), func(t *testing.T) { + migrateIncrementalData(t, testType) + }) + } + + t.Run("MIGRATE - Accessing slot is forbidden on source server but not on destination server", func(t *testing.T) { + testSlot += 1 + require.NoError(t, rdb0.Set(ctx, util.SlotTable[testSlot], 3, 0).Err()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) + waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess) + require.ErrorContains(t, rdb0.Set(ctx, util.SlotTable[testSlot], "source-value", 0).Err(), "MOVED") + require.ErrorContains(t, rdb0.Del(ctx, util.SlotTable[testSlot]).Err(), "MOVED") + require.ErrorContains(t, rdb0.Exists(ctx, util.SlotTable[testSlot]).Err(), "MOVED") + require.NoError(t, rdb1.Set(ctx, util.SlotTable[testSlot], "destination-value", 0).Err()) + }) + + t.Run("MIGRATE - Slot isn't forbidden writing when starting migrating", func(t *testing.T) { + testSlot += 1 + cnt := 20000 + for i := 0; i < cnt; i++ { + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[testSlot], i).Err()) + } + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) + requireMigrateState(t, rdb0, testSlot, SlotMigrationStateStarted) + // write during migrating + require.EqualValues(t, cnt+1, rdb0.LPush(ctx, util.SlotTable[testSlot], cnt).Val()) + waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess) + require.Equal(t, strconv.Itoa(cnt), rdb1.LPop(ctx, util.SlotTable[testSlot]).Val()) + }) + + t.Run("MIGRATE - Slot keys are not cleared after migration but cleared after setslot", func(t *testing.T) { + testSlot += 1 + require.NoError(t, rdb0.Set(ctx, util.SlotTable[testSlot], "slot6", 0).Err()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) + waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess) + require.Equal(t, "slot6", rdb1.Get(ctx, util.SlotTable[testSlot]).Val()) + require.Contains(t, rdb0.Keys(ctx, "*").Val(), util.SlotTable[testSlot]) + require.NoError(t, rdb0.Do(ctx, "clusterx", "setslot", testSlot, "node", id1, "2").Err()) + require.NotContains(t, rdb0.Keys(ctx, "*").Val(), util.SlotTable[testSlot]) }) t.Run("MIGRATE - Slow migrate speed", func(t *testing.T) { - slot := 16 + require.NoError(t, rdb0.ConfigSet(ctx, "migrate-type", string(MigrationTypeRedisCommand)).Err()) + testSlot += 1 require.NoError(t, rdb0.ConfigSet(ctx, "migrate-speed", "16").Err()) require.Equal(t, map[string]string{"migrate-speed": "16"}, rdb0.ConfigGet(ctx, "migrate-speed").Val()) - require.NoError(t, rdb0.Del(ctx, util.SlotTable[slot]).Err()) + require.NoError(t, rdb0.Del(ctx, util.SlotTable[testSlot]).Err()) // more than pipeline size(16) and max items(16) in command cnt := 1000 for i := 0; i < cnt; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[testSlot], i).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) // should not finish 1.5s time.Sleep(1500 * time.Millisecond) - requireMigrateState(t, rdb0, slot, SlotMigrationStateStarted) - waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + requireMigrateState(t, rdb0, testSlot, SlotMigrationStateStarted) + waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess) }) t.Run("MIGRATE - Data of migrated slot can't be written to source but can be written to destination", func(t *testing.T) { - slot := 17 - require.NoError(t, rdb0.Del(ctx, util.SlotTable[slot]).Err()) + testSlot += 1 + require.NoError(t, rdb0.Del(ctx, util.SlotTable[testSlot]).Err()) cnt := 100 for i := 0; i < cnt; i++ { - require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[testSlot], i).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) - waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) - require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[slot]).Val()) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) + waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess) + require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[testSlot]).Val()) // write the migrated slot to source server - k := fmt.Sprintf("{%s}_1", util.SlotTable[slot]) + k := fmt.Sprintf("{%s}_1", util.SlotTable[testSlot]) require.ErrorContains(t, rdb0.Set(ctx, k, "slot17_value1", 0).Err(), "MOVED") // write the migrated slot to destination server require.NoError(t, rdb1.Set(ctx, k, "slot17_value1", 0).Err()) }) t.Run("MIGRATE - LMOVE (src and dst are different) via parsing WAL logs", func(t *testing.T) { - slot1 := 18 + testSlot += 1 - srcListName := fmt.Sprintf("list_src_{%s}", util.SlotTable[slot1]) - dstListName := fmt.Sprintf("list_dst_{%s}", util.SlotTable[slot1]) + srcListName := fmt.Sprintf("list_src_{%s}", util.SlotTable[testSlot]) + dstListName := fmt.Sprintf("list_dst_{%s}", util.SlotTable[testSlot]) require.NoError(t, rdb0.Del(ctx, srcListName).Err()) require.NoError(t, rdb0.Del(ctx, dstListName).Err()) @@ -1030,13 +1064,13 @@ func TestSlotMigrateDataType(t *testing.T) { require.NoError(t, rdb0.RPush(ctx, srcListName, fmt.Sprintf("element%d", i)).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot1, id1).Val()) - requireMigrateState(t, rdb0, slot1, SlotMigrationStateStarted) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) + requireMigrateState(t, rdb0, testSlot, SlotMigrationStateStarted) for i := 0; i < 10; i++ { require.NoError(t, rdb0.LMove(ctx, srcListName, dstListName, "RIGHT", "LEFT").Err()) } - waitForMigrateState(t, rdb0, slot1, SlotMigrationStateSuccess) + waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess) require.ErrorContains(t, rdb0.RPush(ctx, srcListName, "element1000").Err(), "MOVED") require.Equal(t, int64(10), rdb1.LLen(ctx, dstListName).Val()) @@ -1048,9 +1082,9 @@ func TestSlotMigrateDataType(t *testing.T) { }) t.Run("MIGRATE - LMOVE (src and dst are the same) via parsing WAL logs", func(t *testing.T) { - slot1 := 19 + testSlot += 1 - srcListName := fmt.Sprintf("list_src_{%s}", util.SlotTable[slot1]) + srcListName := fmt.Sprintf("list_src_{%s}", util.SlotTable[testSlot]) require.NoError(t, rdb0.Del(ctx, srcListName).Err()) @@ -1065,13 +1099,13 @@ func TestSlotMigrateDataType(t *testing.T) { require.NoError(t, rdb0.RPush(ctx, srcListName, fmt.Sprintf("element%d", i)).Err()) } - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot1, id1).Val()) - requireMigrateState(t, rdb0, slot1, SlotMigrationStateStarted) + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", testSlot, id1).Val()) + requireMigrateState(t, rdb0, testSlot, SlotMigrationStateStarted) for i := 0; i < 10; i++ { require.NoError(t, rdb0.LMove(ctx, srcListName, srcListName, "RIGHT", "LEFT").Err()) } - waitForMigrateState(t, rdb0, slot1, SlotMigrationStateSuccess) + waitForMigrateState(t, rdb0, testSlot, SlotMigrationStateSuccess) require.ErrorContains(t, rdb0.RPush(ctx, srcListName, "element1000").Err(), "MOVED") require.Equal(t, int64(srcLen), rdb1.LLen(ctx, srcListName).Val())