From d32acb99cac132c1e6341057fc6294011ab362ef Mon Sep 17 00:00:00 2001 From: ellutionist Date: Thu, 4 May 2023 13:57:12 +0800 Subject: [PATCH 1/8] support sync migration --- src/cluster/cluster.cc | 9 +-- src/cluster/cluster.h | 4 +- src/cluster/slot_migrate.cc | 32 +++++++- src/cluster/slot_migrate.h | 13 ++- src/cluster/sync_migrate_context.cc | 80 +++++++++++++++++++ src/cluster/sync_migrate_context.h | 27 +++++++ src/commands/cmd_cluster.cc | 33 +++++++- .../slotmigrate/slotmigrate_test.go | 19 ++++- 8 files changed, 202 insertions(+), 15 deletions(-) create mode 100644 src/cluster/sync_migrate_context.cc create mode 100644 src/cluster/sync_migrate_context.h diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 9c4e4986e6c..615c98ef34d 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -291,7 +291,8 @@ Status Cluster::SetSlotImported(int slot) { return Status::OK(); } -Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id) { +Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id, + const std::shared_ptr &blocking_ctx) { if (nodes_.find(dst_node_id) == nodes_.end()) { return {Status::NotOK, "Can't find the destination node id"}; } @@ -316,10 +317,8 @@ Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id) { return {Status::NotOK, "Can't migrate slot to myself"}; } - const auto dst = nodes_[dst_node_id]; - Status s = svr_->slot_migrator->PerformSlotMigration( - dst_node_id, dst->host, dst->port, slot, svr_->GetConfig()->migrate_speed, svr_->GetConfig()->pipeline_size, - svr_->GetConfig()->sequence_gap); + const auto &dst = nodes_[dst_node_id]; + Status s = svr_->slot_migrator->PerformSlotMigration(dst_node_id, dst->host, dst->port, slot, blocking_ctx); return s; } diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index f6d044359f0..bc84472645b 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -72,6 +72,7 @@ struct SlotInfo { using ClusterNodes = std::unordered_map>; class Server; +class SyncMigrateContext; class Cluster { public: @@ -91,7 +92,8 @@ class Cluster { Status CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector &cmd_tokens, redis::Connection *conn); Status SetMasterSlaveRepl(); - Status MigrateSlot(int slot, const std::string &dst_node_id); + Status MigrateSlot(int slot, const std::string &dst_node_id, + const std::shared_ptr &blocking_ctx = nullptr); Status ImportSlot(redis::Connection *conn, int slot, int state); std::string GetMyId() const { return myid_; } Status DumpClusterNodes(const std::string &file); diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index 8f418495c82..6ed74aafeb9 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -28,6 +28,7 @@ #include "fmt/format.h" #include "io_util.h" #include "storage/batch_extractor.h" +#include "sync_migrate_context.h" #include "thread_util.h" #include "time_util.h" #include "types/redis_stream_base.h" @@ -76,7 +77,7 @@ SlotMigrator::SlotMigrator(Server *svr, int max_migration_speed, int max_pipelin } Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id, - int speed, int pipeline_size, int seq_gap) { + const std::shared_ptr &blocking_ctx) { // Only one slot migration job at the same time int16_t no_slot = -1; if (!migrating_slot_.compare_exchange_strong(no_slot, static_cast(slot_id))) { @@ -91,6 +92,10 @@ Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::strin migration_state_ = MigrationState::kStarted; + auto speed = svr_->GetConfig()->migrate_speed; + auto seq_gap = svr_->GetConfig()->sequence_gap; + auto pipeline_size = svr_->GetConfig()->pipeline_size; + if (speed <= 0) { speed = 0; } @@ -103,6 +108,12 @@ Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::strin seq_gap = kDefaultSequenceGapLimit; } + if (blocking_ctx) { + auto lock = blockingLock(); + blocking_context_ = blocking_ctx; + blocking_context_->StartBlock(); + } + dst_node_ = node_id; // Create migration job @@ -184,6 +195,7 @@ void SlotMigrator::runMigrationProcess() { } else { LOG(ERROR) << "[migrate] Failed to start migrating slot " << migrating_slot_ << ". Error: " << s.Msg(); current_stage_ = SlotMigrationStage::kFailed; + wakeupBlocking(s); } break; } @@ -194,6 +206,7 @@ void SlotMigrator::runMigrationProcess() { } else { LOG(ERROR) << "[migrate] Failed to send snapshot of slot " << migrating_slot_ << ". Error: " << s.Msg(); current_stage_ = SlotMigrationStage::kFailed; + wakeupBlocking(s); } break; } @@ -205,6 +218,7 @@ void SlotMigrator::runMigrationProcess() { } else { LOG(ERROR) << "[migrate] Failed to sync from WAL for a slot " << migrating_slot_ << ". Error: " << s.Msg(); current_stage_ = SlotMigrationStage::kFailed; + wakeupBlocking(s); } break; } @@ -214,10 +228,12 @@ void SlotMigrator::runMigrationProcess() { LOG(INFO) << "[migrate] Succeed to migrate slot " << migrating_slot_; current_stage_ = SlotMigrationStage::kClean; migration_state_ = MigrationState::kSuccess; + wakeupBlocking(s); } else { LOG(ERROR) << "[migrate] Failed to finish a successful migration of slot " << migrating_slot_ << ". Error: " << s.Msg(); current_stage_ = SlotMigrationStage::kFailed; + wakeupBlocking(s); } break; } @@ -1083,3 +1099,17 @@ void SlotMigrator::GetMigrationInfo(std::string *info) const { *info = fmt::format("migrating_slot: {}\r\ndestination_node: {}\r\nmigrating_state: {}\r\n", slot, dst_node_, task_state); } + +void SlotMigrator::CancelBlocking() { + auto lock = blockingLock(); + blocking_context_ = nullptr; +} + +void SlotMigrator::wakeupBlocking(const Status &migrate_result) { + auto lock = blockingLock(); + if (blocking_context_) { + blocking_context_->Wakeup(migrate_result); + + blocking_context_.reset(); + } +} diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h index 16e73acbe43..07796598e89 100644 --- a/src/cluster/slot_migrate.h +++ b/src/cluster/slot_migrate.h @@ -71,6 +71,8 @@ struct SlotMigrationJob { int seq_gap_limit; }; +class SyncMigrateContext; + class SlotMigrator : public redis::Database { public: explicit SlotMigrator(Server *svr, int max_migration_speed = kDefaultMaxMigrationSpeed, @@ -80,8 +82,8 @@ class SlotMigrator : public redis::Database { ~SlotMigrator(); Status CreateMigrationThread(); - Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id, int speed, - int pipeline_size, int seq_gap); + Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id, + const std::shared_ptr &blocking_ctx = nullptr); void ReleaseForbiddenSlot(); void SetMaxMigrationSpeed(int value) { if (value >= 0) max_migration_speed_ = value; @@ -98,6 +100,7 @@ class SlotMigrator : public redis::Database { int16_t GetForbiddenSlot() const { return forbidden_slot_; } int16_t GetMigratingSlot() const { return migrating_slot_; } void GetMigrationInfo(std::string *info) const; + void CancelBlocking(); private: void loop(); @@ -131,6 +134,9 @@ class SlotMigrator : public redis::Database { Status syncWalBeforeForbiddingSlot(); Status syncWalAfterForbiddingSlot(); void setForbiddenSlot(int16_t slot); + std::unique_lock blockingLock() { return std::unique_lock(blocking_mutex_); } + + void wakeupBlocking(const Status &migrate_result); enum class ParserState { ArrayLen, BulkLen, BulkData, OneRspEnd }; enum class ThreadState { Uninitialized, Running, Terminated }; @@ -170,4 +176,7 @@ class SlotMigrator : public redis::Database { std::atomic stop_migration_ = false; // if is true migration will be stopped but the thread won't be destroyed const rocksdb::Snapshot *slot_snapshot_ = nullptr; uint64_t wal_begin_seq_ = 0; + + std::mutex blocking_mutex_; + std::shared_ptr blocking_context_; }; diff --git a/src/cluster/sync_migrate_context.cc b/src/cluster/sync_migrate_context.cc new file mode 100644 index 00000000000..f075440823e --- /dev/null +++ b/src/cluster/sync_migrate_context.cc @@ -0,0 +1,80 @@ +#include "cluster/sync_migrate_context.h" + +SyncMigrateContext::~SyncMigrateContext() { + if (timer_) { + event_free(timer_); + timer_ = nullptr; + } +} + +void SyncMigrateContext::StartBlock() { + auto bev = conn_->GetBufferEvent(); + bufferevent_setcb(bev, nullptr, WriteCB, EventCB, this); + + if (timeout_) { + timer_ = evtimer_new(bufferevent_get_base(bev), TimerCB, this); + timeval tm = {timeout_, 0}; + evtimer_add(timer_, &tm); + } +} + +void SyncMigrateContext::Wakeup(const Status &migrate_result) { + migrate_result_ = migrate_result; + auto s = conn_->Owner()->EnableWriteEvent(conn_->GetFD()); + if (!s.IsOK()) { + LOG(ERROR) << "[server] Failed to enable write event on the sync migrate connection " << conn_->GetFD() << ": " + << s.Msg(); + } +} + +void SyncMigrateContext::EventCB(bufferevent *bev, int16_t events, void *ctx) { + auto self = reinterpret_cast(ctx); + auto &&slot_migrator = self->svr_->slot_migrator; + + if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { + if (self->timer_ != nullptr) { + event_free(self->timer_); + self->timer_ = nullptr; + } + + slot_migrator->CancelBlocking(); + } + redis::Connection::OnEvent(bev, events, self->conn_); +} + +void SyncMigrateContext::TimerCB(int, int16_t events, void *ctx) { + auto self = reinterpret_cast(ctx); + auto &&slot_migrator = self->svr_->slot_migrator; + + self->conn_->Reply(redis::NilString()); + event_free(self->timer_); + self->timer_ = nullptr; + + slot_migrator->CancelBlocking(); + + auto bev = self->conn_->GetBufferEvent(); + bufferevent_setcb(bev, redis::Connection::OnRead, redis::Connection::OnWrite, redis::Connection::OnEvent, + self->conn_); + bufferevent_enable(bev, EV_READ); +} + +void SyncMigrateContext::WriteCB(bufferevent *bev, void *ctx) { + auto self = reinterpret_cast(ctx); + + if (self->migrate_result_) { + self->conn_->Reply(redis::SimpleString("OK")); + } else { + self->conn_->Reply(redis::Error("ERR " + self->migrate_result_.Msg())); + } + + if (self->timer_) { + event_free(self->timer_); + self->timer_ = nullptr; + } + + bufferevent_setcb(bev, redis::Connection::OnRead, redis::Connection::OnWrite, redis::Connection::OnEvent, + self->conn_); + bufferevent_enable(bev, EV_READ); + + bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); +} diff --git a/src/cluster/sync_migrate_context.h b/src/cluster/sync_migrate_context.h new file mode 100644 index 00000000000..e8a19ca4607 --- /dev/null +++ b/src/cluster/sync_migrate_context.h @@ -0,0 +1,27 @@ +#include "server/server.h" + +class SyncMigrateContext { + public: + SyncMigrateContext(Server *svr, redis::Connection *conn, int timeout) : svr_(svr), conn_(conn), timeout_(timeout){}; + SyncMigrateContext(SyncMigrateContext &&) = delete; + SyncMigrateContext(const SyncMigrateContext &) = delete; + + SyncMigrateContext &operator=(SyncMigrateContext &&) = delete; + SyncMigrateContext &operator=(const SyncMigrateContext &) = delete; + + ~SyncMigrateContext(); + + void StartBlock(); + void Wakeup(const Status &migrate_result); + static void WriteCB(bufferevent *bev, void *ctx); + static void EventCB(bufferevent *bev, int16_t events, void *ctx); + static void TimerCB(int, int16_t events, void *ctx); + + private: + Server *svr_; + redis::Connection *conn_; + int timeout_ = 0; + event *timer_ = nullptr; + + Status migrate_result_; +}; diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index 34ad8af8cf4..d0484799f7f 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -19,6 +19,7 @@ */ #include "cluster/slot_import.h" +#include "cluster/sync_migrate_context.h" #include "commander.h" #include "error_constants.h" @@ -126,11 +127,28 @@ class CommandClusterX : public Commander { if (subcommand_ == "setnodeid" && args_.size() == 3 && args_[2].size() == kClusterNodeIdLen) return Status::OK(); if (subcommand_ == "migrate") { - if (args.size() != 4) return {Status::RedisParseErr, errWrongNumOfArguments}; + if (args.size() < 4) return {Status::RedisParseErr, errWrongNumOfArguments}; slot_ = GET_OR_RET(ParseInt(args[2], 10)); dst_node_id_ = args[3]; + + if (args.size() >= 5) { + auto sync_flag = util::ToLower(args[4]); + if (sync_flag == "async") { + sync_migrate_ = false; + } else if (sync_flag == "sync") { + sync_migrate_ = true; + + if (args.size() == 6) { + sync_migrate_timeout_ = GET_OR_RET(ParseInt(args[5], 10)); + } else if (args.size() > 6) { + return {Status::RedisParseErr, "Wrong number of arguments for MIGRATE SYNC option"}; + } + } else { + return {Status::RedisParseErr, "Invalid sync flag"}; + } + } return Status::OK(); } @@ -230,8 +248,15 @@ class CommandClusterX : public Commander { int64_t v = svr->cluster->GetVersion(); *output = redis::BulkString(std::to_string(v)); } else if (subcommand_ == "migrate") { - Status s = svr->cluster->MigrateSlot(static_cast(slot_), dst_node_id_); + if (sync_migrate_) { + sync_migrate_ctx_ = std::make_shared(svr, conn, sync_migrate_timeout_); + } + + Status s = svr->cluster->MigrateSlot(static_cast(slot_), dst_node_id_, sync_migrate_ctx_); if (s.IsOK()) { + if (sync_migrate_) { + return {Status::BlockingCmd}; + } *output = redis::SimpleString("OK"); } else { *output = redis::Error(s.Msg()); @@ -253,6 +278,10 @@ class CommandClusterX : public Commander { int64_t slot_ = -1; int slot_id_ = -1; bool force_ = false; + + bool sync_migrate_ = false; + int sync_migrate_timeout_ = 0; + std::shared_ptr sync_migrate_ctx_ = nullptr; }; REDIS_REGISTER_COMMANDS(MakeCmdAttr("cluster", -2, "cluster no-script", 0, 0, 0), diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index 8ca890123bf..fb9a8fbef93 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -418,8 +418,7 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, cnt, rdb1.LLen(ctx, util.SlotTable[slot]).Val()) }) - t.Run("MIGRATE - Slot migrate all types of existing data", func(t *testing.T) { - slot := 1 + migrateAllTypes := func(t *testing.T, sync bool, slot int) { keys := make(map[string]string, 0) for _, typ := range []string{"string", "expired_string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} { keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[slot]) @@ -494,8 +493,12 @@ func TestSlotMigrateDataType(t *testing.T) { require.EqualValues(t, 19, streamInfo.Length) // migrate slot 1, all keys above are belong to slot 1 - require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) - waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + if !sync { + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1).Val()) + waitForMigrateState(t, rdb0, slot, SlotMigrationStateSuccess) + } else { + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1, "sync").Val()) + } // check destination data // type string @@ -540,6 +543,14 @@ func TestSlotMigrateDataType(t *testing.T) { for _, typ := range []string{"string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} { require.ErrorContains(t, rdb0.Exists(ctx, keys[typ]).Err(), "MOVED") } + } + + t.Run("MIGRATE - Slot migrate all types of existing data", func(t *testing.T) { + migrateAllTypes(t, false, 1) + }) + + t.Run("MIGRATE - Slot migrate all types of existing data (sync)", func(t *testing.T) { + migrateAllTypes(t, true, 2) }) t.Run("MIGRATE - increment sync stream from WAL", func(t *testing.T) { From ab2291370e0eb2697b4a88b8490747f7c91b1b56 Mon Sep 17 00:00:00 2001 From: ellutionist Date: Thu, 4 May 2023 20:59:24 +0800 Subject: [PATCH 2/8] add license header --- src/cluster/sync_migrate_context.cc | 20 ++++++++++++++++++++ src/cluster/sync_migrate_context.h | 21 +++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/src/cluster/sync_migrate_context.cc b/src/cluster/sync_migrate_context.cc index f075440823e..e7f7cbf9053 100644 --- a/src/cluster/sync_migrate_context.cc +++ b/src/cluster/sync_migrate_context.cc @@ -1,3 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + #include "cluster/sync_migrate_context.h" SyncMigrateContext::~SyncMigrateContext() { diff --git a/src/cluster/sync_migrate_context.h b/src/cluster/sync_migrate_context.h index e8a19ca4607..c4ec3aa52e2 100644 --- a/src/cluster/sync_migrate_context.h +++ b/src/cluster/sync_migrate_context.h @@ -1,3 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once #include "server/server.h" class SyncMigrateContext { From 35c05925acf4ff7279d78ea33f69156ca03a61b9 Mon Sep 17 00:00:00 2001 From: ellutionist Date: Mon, 8 May 2023 23:22:44 +0800 Subject: [PATCH 3/8] use unique_ptr instead of shared_ptr to hold `SyncMigrateContext` --- src/cluster/cluster.cc | 2 +- src/cluster/cluster.h | 2 +- src/cluster/slot_migrate.cc | 4 ++-- src/cluster/slot_migrate.h | 4 ++-- src/commands/cmd_cluster.cc | 6 +++--- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 615c98ef34d..7f46f99f641 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -292,7 +292,7 @@ Status Cluster::SetSlotImported(int slot) { } Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id, - const std::shared_ptr &blocking_ctx) { + SyncMigrateContext *blocking_ctx) { if (nodes_.find(dst_node_id) == nodes_.end()) { return {Status::NotOK, "Can't find the destination node id"}; } diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index bc84472645b..c3f9c7cfe84 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -93,7 +93,7 @@ class Cluster { redis::Connection *conn); Status SetMasterSlaveRepl(); Status MigrateSlot(int slot, const std::string &dst_node_id, - const std::shared_ptr &blocking_ctx = nullptr); + SyncMigrateContext *blocking_ctx = nullptr); Status ImportSlot(redis::Connection *conn, int slot, int state); std::string GetMyId() const { return myid_; } Status DumpClusterNodes(const std::string &file); diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index 6ed74aafeb9..49c30c23a78 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -77,7 +77,7 @@ SlotMigrator::SlotMigrator(Server *svr, int max_migration_speed, int max_pipelin } Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id, - const std::shared_ptr &blocking_ctx) { + SyncMigrateContext *blocking_ctx) { // Only one slot migration job at the same time int16_t no_slot = -1; if (!migrating_slot_.compare_exchange_strong(no_slot, static_cast(slot_id))) { @@ -1110,6 +1110,6 @@ void SlotMigrator::wakeupBlocking(const Status &migrate_result) { if (blocking_context_) { blocking_context_->Wakeup(migrate_result); - blocking_context_.reset(); + blocking_context_ = nullptr; } } diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h index 07796598e89..a2d0c3a9c41 100644 --- a/src/cluster/slot_migrate.h +++ b/src/cluster/slot_migrate.h @@ -83,7 +83,7 @@ class SlotMigrator : public redis::Database { Status CreateMigrationThread(); Status PerformSlotMigration(const std::string &node_id, std::string &dst_ip, int dst_port, int slot_id, - const std::shared_ptr &blocking_ctx = nullptr); + SyncMigrateContext *blocking_ctx = nullptr); void ReleaseForbiddenSlot(); void SetMaxMigrationSpeed(int value) { if (value >= 0) max_migration_speed_ = value; @@ -178,5 +178,5 @@ class SlotMigrator : public redis::Database { uint64_t wal_begin_seq_ = 0; std::mutex blocking_mutex_; - std::shared_ptr blocking_context_; + SyncMigrateContext *blocking_context_; }; diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index d0484799f7f..5f175f9dca5 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -249,10 +249,10 @@ class CommandClusterX : public Commander { *output = redis::BulkString(std::to_string(v)); } else if (subcommand_ == "migrate") { if (sync_migrate_) { - sync_migrate_ctx_ = std::make_shared(svr, conn, sync_migrate_timeout_); + sync_migrate_ctx_ = std::make_unique(svr, conn, sync_migrate_timeout_); } - Status s = svr->cluster->MigrateSlot(static_cast(slot_), dst_node_id_, sync_migrate_ctx_); + Status s = svr->cluster->MigrateSlot(static_cast(slot_), dst_node_id_, sync_migrate_ctx_.get()); if (s.IsOK()) { if (sync_migrate_) { return {Status::BlockingCmd}; @@ -281,7 +281,7 @@ class CommandClusterX : public Commander { bool sync_migrate_ = false; int sync_migrate_timeout_ = 0; - std::shared_ptr sync_migrate_ctx_ = nullptr; + std::unique_ptr sync_migrate_ctx_ = nullptr; }; REDIS_REGISTER_COMMANDS(MakeCmdAttr("cluster", -2, "cluster no-script", 0, 0, 0), From 09890761e098aa31530784191e58698360175f64 Mon Sep 17 00:00:00 2001 From: ellutionist Date: Mon, 8 May 2023 23:39:48 +0800 Subject: [PATCH 4/8] Improve the command args parsing --- src/commands/cmd_cluster.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index 5f175f9dca5..8103c2199d5 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -127,7 +127,7 @@ class CommandClusterX : public Commander { if (subcommand_ == "setnodeid" && args_.size() == 3 && args_[2].size() == kClusterNodeIdLen) return Status::OK(); if (subcommand_ == "migrate") { - if (args.size() < 4) return {Status::RedisParseErr, errWrongNumOfArguments}; + if (args.size() < 4 || args.size() > 6) return {Status::RedisParseErr, errWrongNumOfArguments}; slot_ = GET_OR_RET(ParseInt(args[2], 10)); @@ -137,13 +137,15 @@ class CommandClusterX : public Commander { auto sync_flag = util::ToLower(args[4]); if (sync_flag == "async") { sync_migrate_ = false; + + if (args.size() == 6) { + return {Status::RedisParseErr, "Async migration does not support timeout"}; + } } else if (sync_flag == "sync") { sync_migrate_ = true; if (args.size() == 6) { sync_migrate_timeout_ = GET_OR_RET(ParseInt(args[5], 10)); - } else if (args.size() > 6) { - return {Status::RedisParseErr, "Wrong number of arguments for MIGRATE SYNC option"}; } } else { return {Status::RedisParseErr, "Invalid sync flag"}; From 2c74650fa527989da1d906dd2dd26bca4b0570e0 Mon Sep 17 00:00:00 2001 From: ellutionist Date: Wed, 10 May 2023 15:25:53 +0800 Subject: [PATCH 5/8] Format Code --- src/cluster/cluster.cc | 3 +-- src/cluster/cluster.h | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 7f46f99f641..7b7c9461c24 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -291,8 +291,7 @@ Status Cluster::SetSlotImported(int slot) { return Status::OK(); } -Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id, - SyncMigrateContext *blocking_ctx) { +Status Cluster::MigrateSlot(int slot, const std::string &dst_node_id, SyncMigrateContext *blocking_ctx) { if (nodes_.find(dst_node_id) == nodes_.end()) { return {Status::NotOK, "Can't find the destination node id"}; } diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index c3f9c7cfe84..64239645a19 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -92,8 +92,7 @@ class Cluster { Status CanExecByMySelf(const redis::CommandAttributes *attributes, const std::vector &cmd_tokens, redis::Connection *conn); Status SetMasterSlaveRepl(); - Status MigrateSlot(int slot, const std::string &dst_node_id, - SyncMigrateContext *blocking_ctx = nullptr); + Status MigrateSlot(int slot, const std::string &dst_node_id, SyncMigrateContext *blocking_ctx = nullptr); Status ImportSlot(redis::Connection *conn, int slot, int state); std::string GetMyId() const { return myid_; } Status DumpClusterNodes(const std::string &file); From 383dcfeeac3027e5ffe016601504e94ab3af29f3 Mon Sep 17 00:00:00 2001 From: ellutionist Date: Wed, 10 May 2023 22:48:19 +0800 Subject: [PATCH 6/8] Adapt to the recent changes && add more go test cases --- src/cluster/sync_migrate_context.cc | 63 +++++++------------ src/cluster/sync_migrate_context.h | 23 +++---- src/commands/cmd_cluster.cc | 4 +- .../slotmigrate/slotmigrate_test.go | 63 +++++++++++++++++++ 4 files changed, 96 insertions(+), 57 deletions(-) diff --git a/src/cluster/sync_migrate_context.cc b/src/cluster/sync_migrate_context.cc index e7f7cbf9053..d38ae5115af 100644 --- a/src/cluster/sync_migrate_context.cc +++ b/src/cluster/sync_migrate_context.cc @@ -20,21 +20,15 @@ #include "cluster/sync_migrate_context.h" -SyncMigrateContext::~SyncMigrateContext() { - if (timer_) { - event_free(timer_); - timer_ = nullptr; - } -} - void SyncMigrateContext::StartBlock() { auto bev = conn_->GetBufferEvent(); - bufferevent_setcb(bev, nullptr, WriteCB, EventCB, this); + SetCB(bev); - if (timeout_) { - timer_ = evtimer_new(bufferevent_get_base(bev), TimerCB, this); - timeval tm = {timeout_, 0}; - evtimer_add(timer_, &tm); + if (timeout_ > 0) { + timer_.reset(NewTimer(bufferevent_get_base(bev))); + int sec = static_cast(timeout_); + timeval tm = {sec, static_cast((timeout_ - static_cast(sec)) * 1000000)}; + evtimer_add(timer_.get(), &tm); } } @@ -47,53 +41,40 @@ void SyncMigrateContext::Wakeup(const Status &migrate_result) { } } -void SyncMigrateContext::EventCB(bufferevent *bev, int16_t events, void *ctx) { - auto self = reinterpret_cast(ctx); - auto &&slot_migrator = self->svr_->slot_migrator; +void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t events) { + auto &&slot_migrator = svr_->slot_migrator; if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { - if (self->timer_ != nullptr) { - event_free(self->timer_); - self->timer_ = nullptr; - } + timer_.reset(); slot_migrator->CancelBlocking(); } - redis::Connection::OnEvent(bev, events, self->conn_); + conn_->OnEvent(bev, events); } -void SyncMigrateContext::TimerCB(int, int16_t events, void *ctx) { - auto self = reinterpret_cast(ctx); - auto &&slot_migrator = self->svr_->slot_migrator; +void SyncMigrateContext::TimerCB(int, int16_t events) { + auto &&slot_migrator = svr_->slot_migrator; - self->conn_->Reply(redis::NilString()); - event_free(self->timer_); - self->timer_ = nullptr; + conn_->Reply(redis::NilString()); + timer_.reset(); slot_migrator->CancelBlocking(); - auto bev = self->conn_->GetBufferEvent(); - bufferevent_setcb(bev, redis::Connection::OnRead, redis::Connection::OnWrite, redis::Connection::OnEvent, - self->conn_); + auto bev = conn_->GetBufferEvent(); + conn_->SetCB(bev); bufferevent_enable(bev, EV_READ); } -void SyncMigrateContext::WriteCB(bufferevent *bev, void *ctx) { - auto self = reinterpret_cast(ctx); - - if (self->migrate_result_) { - self->conn_->Reply(redis::SimpleString("OK")); +void SyncMigrateContext::OnWrite(bufferevent *bev) { + if (migrate_result_) { + conn_->Reply(redis::SimpleString("OK")); } else { - self->conn_->Reply(redis::Error("ERR " + self->migrate_result_.Msg())); + conn_->Reply(redis::Error("ERR " + migrate_result_.Msg())); } - if (self->timer_) { - event_free(self->timer_); - self->timer_ = nullptr; - } + timer_.reset(); - bufferevent_setcb(bev, redis::Connection::OnRead, redis::Connection::OnWrite, redis::Connection::OnEvent, - self->conn_); + conn_->SetCB(bev); bufferevent_enable(bev, EV_READ); bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS); diff --git a/src/cluster/sync_migrate_context.h b/src/cluster/sync_migrate_context.h index c4ec3aa52e2..befe8cd9f0c 100644 --- a/src/cluster/sync_migrate_context.h +++ b/src/cluster/sync_migrate_context.h @@ -19,30 +19,25 @@ */ #pragma once +#include "event_util.h" #include "server/server.h" -class SyncMigrateContext { +class SyncMigrateContext : private EvbufCallbackBase, + private EventCallbackBase { public: - SyncMigrateContext(Server *svr, redis::Connection *conn, int timeout) : svr_(svr), conn_(conn), timeout_(timeout){}; - SyncMigrateContext(SyncMigrateContext &&) = delete; - SyncMigrateContext(const SyncMigrateContext &) = delete; - - SyncMigrateContext &operator=(SyncMigrateContext &&) = delete; - SyncMigrateContext &operator=(const SyncMigrateContext &) = delete; - - ~SyncMigrateContext(); + SyncMigrateContext(Server *svr, redis::Connection *conn, float timeout) : svr_(svr), conn_(conn), timeout_(timeout){}; void StartBlock(); void Wakeup(const Status &migrate_result); - static void WriteCB(bufferevent *bev, void *ctx); - static void EventCB(bufferevent *bev, int16_t events, void *ctx); - static void TimerCB(int, int16_t events, void *ctx); + void OnWrite(bufferevent *bev); + void OnEvent(bufferevent *bev, int16_t events); + void TimerCB(int, int16_t events); private: Server *svr_; redis::Connection *conn_; - int timeout_ = 0; - event *timer_ = nullptr; + float timeout_ = 0; + UniqueEvent timer_; Status migrate_result_; }; diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index 8103c2199d5..0299f37edc3 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -145,7 +145,7 @@ class CommandClusterX : public Commander { sync_migrate_ = true; if (args.size() == 6) { - sync_migrate_timeout_ = GET_OR_RET(ParseInt(args[5], 10)); + sync_migrate_timeout_ = GET_OR_RET(ParseFloat(args[5])); } } else { return {Status::RedisParseErr, "Invalid sync flag"}; @@ -282,7 +282,7 @@ class CommandClusterX : public Commander { bool force_ = false; bool sync_migrate_ = false; - int sync_migrate_timeout_ = 0; + float sync_migrate_timeout_ = 0; std::unique_ptr sync_migrate_ctx_ = nullptr; }; diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index fb9a8fbef93..2c24b1999a8 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -383,6 +383,69 @@ func TestSlotMigrateThreeNodes(t *testing.T) { }) } +func TestSlotMigrateSync(t *testing.T) { + ctx := context.Background() + + srv0 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { srv0.Close() }() + rdb0 := srv0.NewClientWithOption(&redis.Options{PoolSize: 1}) + defer func() { require.NoError(t, rdb0.Close()) }() + id0 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODEID", id0).Err()) + + srv1 := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { srv1.Close() }() + rdb1 := srv1.NewClientWithOption(&redis.Options{PoolSize: 1}) + defer func() { require.NoError(t, rdb1.Close()) }() + id1 := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODEID", id1).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-8191\n", id0, srv0.Host(), srv0.Port()) + clusterNodes += fmt.Sprintf("%s %s %d master - 8192-16383", id1, srv1.Host(), srv1.Port()) + require.NoError(t, rdb0.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, rdb1.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + slot := -1 + t.Run("MIGRATE - Cannot migrate async with timeout", func(t *testing.T) { + slot++ + require.Error(t, rdb0.Do(ctx, "clusterx", "migrate", slot, id1, "async", 1).Err()) + }) + + t.Run("MIGRATE - Migrate sync with (or without) all kinds of timeouts", func(t *testing.T) { + slot++ + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1, "sync").Val()) + + slot++ + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1, "sync", -1).Val()) + + slot++ + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1, "sync", 0).Val()) + + slot++ + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1, "sync", 10).Val()) + + slot++ + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1, "sync", 0.5).Val()) + + slot++ + require.Equal(t, "OK", rdb0.Do(ctx, "clusterx", "migrate", slot, id1, "sync", -3.14).Val()) + }) + + t.Run("MIGRATE - Migrate sync timeout", func(t *testing.T) { + cnt := 200000 + slot++ + for i := 0; i < cnt; i++ { + require.NoError(t, rdb0.LPush(ctx, util.SlotTable[slot], i).Err()) + } + timeout := 0.001 + + require.Nil(t, rdb0.Do(ctx, "clusterx", "migrate", slot, id1, "sync", timeout).Val()) + + // check the following command on the same connection + require.Equal(t, "PONG", rdb0.Ping(ctx).Val()) + }) +} + func TestSlotMigrateDataType(t *testing.T) { ctx := context.Background() From a564974bb1c914bf36fb5a26f283fc228ffde8ee Mon Sep 17 00:00:00 2001 From: ellutionist Date: Thu, 11 May 2023 21:18:00 +0800 Subject: [PATCH 7/8] Simplify code && rename --- src/cluster/slot_migrate.cc | 24 +++++++++---------- src/cluster/slot_migrate.h | 4 ++-- src/cluster/sync_migrate_context.cc | 8 +++---- src/cluster/sync_migrate_context.h | 4 ++-- .../slotmigrate/slotmigrate_test.go | 2 +- 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index 49c30c23a78..4b4f0f4f0ed 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -109,9 +109,9 @@ Status SlotMigrator::PerformSlotMigration(const std::string &node_id, std::strin } if (blocking_ctx) { - auto lock = blockingLock(); + std::unique_lock lock(blocking_mutex_); blocking_context_ = blocking_ctx; - blocking_context_->StartBlock(); + blocking_context_->Suspend(); } dst_node_ = node_id; @@ -195,7 +195,7 @@ void SlotMigrator::runMigrationProcess() { } else { LOG(ERROR) << "[migrate] Failed to start migrating slot " << migrating_slot_ << ". Error: " << s.Msg(); current_stage_ = SlotMigrationStage::kFailed; - wakeupBlocking(s); + resumeSyncCtx(s); } break; } @@ -206,7 +206,7 @@ void SlotMigrator::runMigrationProcess() { } else { LOG(ERROR) << "[migrate] Failed to send snapshot of slot " << migrating_slot_ << ". Error: " << s.Msg(); current_stage_ = SlotMigrationStage::kFailed; - wakeupBlocking(s); + resumeSyncCtx(s); } break; } @@ -218,7 +218,7 @@ void SlotMigrator::runMigrationProcess() { } else { LOG(ERROR) << "[migrate] Failed to sync from WAL for a slot " << migrating_slot_ << ". Error: " << s.Msg(); current_stage_ = SlotMigrationStage::kFailed; - wakeupBlocking(s); + resumeSyncCtx(s); } break; } @@ -228,12 +228,12 @@ void SlotMigrator::runMigrationProcess() { LOG(INFO) << "[migrate] Succeed to migrate slot " << migrating_slot_; current_stage_ = SlotMigrationStage::kClean; migration_state_ = MigrationState::kSuccess; - wakeupBlocking(s); + resumeSyncCtx(s); } else { LOG(ERROR) << "[migrate] Failed to finish a successful migration of slot " << migrating_slot_ << ". Error: " << s.Msg(); current_stage_ = SlotMigrationStage::kFailed; - wakeupBlocking(s); + resumeSyncCtx(s); } break; } @@ -1100,15 +1100,15 @@ void SlotMigrator::GetMigrationInfo(std::string *info) const { fmt::format("migrating_slot: {}\r\ndestination_node: {}\r\nmigrating_state: {}\r\n", slot, dst_node_, task_state); } -void SlotMigrator::CancelBlocking() { - auto lock = blockingLock(); +void SlotMigrator::CancelSyncCtx() { + std::unique_lock lock(blocking_mutex_); blocking_context_ = nullptr; } -void SlotMigrator::wakeupBlocking(const Status &migrate_result) { - auto lock = blockingLock(); +void SlotMigrator::resumeSyncCtx(const Status &migrate_result) { + std::unique_lock lock(blocking_mutex_); if (blocking_context_) { - blocking_context_->Wakeup(migrate_result); + blocking_context_->Resume(migrate_result); blocking_context_ = nullptr; } diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h index cbb3cc09ef7..7da25f454ea 100644 --- a/src/cluster/slot_migrate.h +++ b/src/cluster/slot_migrate.h @@ -100,7 +100,7 @@ class SlotMigrator : public redis::Database { int16_t GetForbiddenSlot() const { return forbidden_slot_; } int16_t GetMigratingSlot() const { return migrating_slot_; } void GetMigrationInfo(std::string *info) const; - void CancelBlocking(); + void CancelSyncCtx(); private: void loop(); @@ -136,7 +136,7 @@ class SlotMigrator : public redis::Database { void setForbiddenSlot(int16_t slot); std::unique_lock blockingLock() { return std::unique_lock(blocking_mutex_); } - void wakeupBlocking(const Status &migrate_result); + void resumeSyncCtx(const Status &migrate_result); enum class ParserState { ArrayLen, BulkLen, BulkData, OneRspEnd }; enum class ThreadState { Uninitialized, Running, Terminated }; diff --git a/src/cluster/sync_migrate_context.cc b/src/cluster/sync_migrate_context.cc index d38ae5115af..1bc682dc0c4 100644 --- a/src/cluster/sync_migrate_context.cc +++ b/src/cluster/sync_migrate_context.cc @@ -20,7 +20,7 @@ #include "cluster/sync_migrate_context.h" -void SyncMigrateContext::StartBlock() { +void SyncMigrateContext::Suspend() { auto bev = conn_->GetBufferEvent(); SetCB(bev); @@ -32,7 +32,7 @@ void SyncMigrateContext::StartBlock() { } } -void SyncMigrateContext::Wakeup(const Status &migrate_result) { +void SyncMigrateContext::Resume(const Status &migrate_result) { migrate_result_ = migrate_result; auto s = conn_->Owner()->EnableWriteEvent(conn_->GetFD()); if (!s.IsOK()) { @@ -47,7 +47,7 @@ void SyncMigrateContext::OnEvent(bufferevent *bev, int16_t events) { if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { timer_.reset(); - slot_migrator->CancelBlocking(); + slot_migrator->CancelSyncCtx(); } conn_->OnEvent(bev, events); } @@ -58,7 +58,7 @@ void SyncMigrateContext::TimerCB(int, int16_t events) { conn_->Reply(redis::NilString()); timer_.reset(); - slot_migrator->CancelBlocking(); + slot_migrator->CancelSyncCtx(); auto bev = conn_->GetBufferEvent(); conn_->SetCB(bev); diff --git a/src/cluster/sync_migrate_context.h b/src/cluster/sync_migrate_context.h index befe8cd9f0c..4d3b125a4cb 100644 --- a/src/cluster/sync_migrate_context.h +++ b/src/cluster/sync_migrate_context.h @@ -27,8 +27,8 @@ class SyncMigrateContext : private EvbufCallbackBase, public: SyncMigrateContext(Server *svr, redis::Connection *conn, float timeout) : svr_(svr), conn_(conn), timeout_(timeout){}; - void StartBlock(); - void Wakeup(const Status &migrate_result); + void Suspend(); + void Resume(const Status &migrate_result); void OnWrite(bufferevent *bev); void OnEvent(bufferevent *bev, int16_t events); void TimerCB(int, int16_t events); diff --git a/tests/gocase/integration/slotmigrate/slotmigrate_test.go b/tests/gocase/integration/slotmigrate/slotmigrate_test.go index 2c24b1999a8..0abb8143ab5 100644 --- a/tests/gocase/integration/slotmigrate/slotmigrate_test.go +++ b/tests/gocase/integration/slotmigrate/slotmigrate_test.go @@ -441,7 +441,7 @@ func TestSlotMigrateSync(t *testing.T) { require.Nil(t, rdb0.Do(ctx, "clusterx", "migrate", slot, id1, "sync", timeout).Val()) - // check the following command on the same connection + // check the following command on the same connection require.Equal(t, "PONG", rdb0.Ping(ctx).Val()) }) } From 5eab45e8e82ad540e2f63afac9c449a90db66130 Mon Sep 17 00:00:00 2001 From: ellutionist Date: Fri, 12 May 2023 10:24:16 +0800 Subject: [PATCH 8/8] initialize pointer of ctx with `nullptr` --- src/cluster/slot_migrate.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h index 7da25f454ea..5dc1559ccf0 100644 --- a/src/cluster/slot_migrate.h +++ b/src/cluster/slot_migrate.h @@ -178,5 +178,5 @@ class SlotMigrator : public redis::Database { uint64_t wal_begin_seq_ = 0; std::mutex blocking_mutex_; - SyncMigrateContext *blocking_context_; + SyncMigrateContext *blocking_context_ = nullptr; };