From b59c58e1867f02bc9084fd3f0f038c31b5983406 Mon Sep 17 00:00:00 2001 From: Yaroslav Date: Mon, 26 Dec 2022 13:52:20 +0200 Subject: [PATCH] Refactor return value of functions that related to a slot migration (#1205) --- src/cluster/slot_migrate.cc | 366 +++++++++++++++++------------------- src/cluster/slot_migrate.h | 56 +++--- 2 files changed, 202 insertions(+), 220 deletions(-) diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index d68128c6140..32549e82c65 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -33,6 +33,10 @@ #include "types/redis_stream_base.h" #include "types/redis_string.h" +const char *errFailedToSendCommands = "failed to send commands to restore a key"; +const char *errMigrationTaskCanceled = "key migration stopped due to a task cancellation"; +const char *errFailedToSetImportStatus = "failed to set import status on destination node"; + static std::map type_to_cmd = { {kRedisString, "set"}, {kRedisList, "rpush"}, {kRedisHash, "hmset"}, {kRedisSet, "sadd"}, {kRedisZSet, "zadd"}, {kRedisBitmap, "setbit"}, {kRedisSortedint, "siadd"}, {kRedisStream, "xadd"}, @@ -68,14 +72,6 @@ SlotMigrate::SlotMigrate(Server *svr, int migration_speed, int pipeline_size_lim seq_gap_limit_ = seq_gap; } - dst_port_ = -1; - forbidden_slot_ = -1; - migrate_slot_ = -1; - migrate_failed_slot_ = -1; - migrate_state_ = kMigrateNone; - stop_migrate_ = false; - slot_snapshot_ = nullptr; - if (svr->IsSlave()) { SetMigrateStopFlag(true); } @@ -163,12 +159,13 @@ void SlotMigrate::Loop() { pipeline_size_limit_ = slot_job_->pipeline_size_; seq_gap_limit_ = slot_job_->seq_gap_; - StateMachine(); + RunStateMachine(); } } -void SlotMigrate::StateMachine() { +void SlotMigrate::RunStateMachine() { state_machine_ = kSlotMigrateStart; + while (true) { if (IsTerminated()) { LOG(WARNING) << "[migrate] Will stop state machine, because the thread was terminated"; @@ -179,10 +176,10 @@ void SlotMigrate::StateMachine() { case kSlotMigrateStart: { auto s = Start(); if (s.IsOK()) { - LOG(INFO) << "[migrate] Succeed to start migrating"; + LOG(INFO) << "[migrate] Succeed to start migrating slot " << migrate_slot_; state_machine_ = kSlotMigrateSnapshot; } else { - LOG(ERROR) << "[migrate] Failed to start migrating"; + LOG(ERROR) << "[migrate] Failed to start migrating slot " << migrate_slot_ << ". Error: " << s.Msg(); state_machine_ = kSlotMigrateFailed; } break; @@ -190,10 +187,9 @@ void SlotMigrate::StateMachine() { case kSlotMigrateSnapshot: { auto s = SendSnapshot(); if (s.IsOK()) { - LOG(INFO) << "[migrate] Succeed to send snapshot"; state_machine_ = kSlotMigrateWal; } else { - LOG(ERROR) << "[migrate] Failed to send snapshot"; + LOG(ERROR) << "[migrate] Failed to send snapshot of slot " << migrate_slot_ << ". Error: " << s.Msg(); state_machine_ = kSlotMigrateFailed; } break; @@ -201,10 +197,10 @@ void SlotMigrate::StateMachine() { case kSlotMigrateWal: { auto s = SyncWal(); if (s.IsOK()) { - LOG(INFO) << "[migrate] Succeed to sync WAL"; + LOG(INFO) << "[migrate] Succeed to sync WAL for a slot " << migrate_slot_; state_machine_ = kSlotMigrateSuccess; } else { - LOG(ERROR) << "[migrate] Failed to sync Wal"; + LOG(ERROR) << "[migrate] Failed to sync WAL for a slot " << migrate_slot_ << ". Error: " << s.Msg(); state_machine_ = kSlotMigrateFailed; } break; @@ -216,21 +212,29 @@ void SlotMigrate::StateMachine() { state_machine_ = kSlotMigrateClean; migrate_state_ = kMigrateSuccess; } else { + LOG(ERROR) << "[migrate] Failed to finish a successful migration of slot " << migrate_slot_ + << ". Error: " << s.Msg(); state_machine_ = kSlotMigrateFailed; } break; } - case kSlotMigrateFailed: - Fail(); - LOG(INFO) << "[migrate] Failed to migrate slot" << migrate_slot_; + case kSlotMigrateFailed: { + auto s = Fail(); + if (!s.IsOK()) { + LOG(ERROR) << "[migrate] Failed to finish a failed migration of slot " << migrate_slot_ + << ". Error: " << s.Msg(); + } + LOG(INFO) << "[migrate] Failed to migrate a slot" << migrate_slot_; migrate_state_ = kMigrateFailed; state_machine_ = kSlotMigrateClean; break; - case kSlotMigrateClean: + } + case kSlotMigrateClean: { Clean(); return; + } default: - LOG(ERROR) << "[migrate] Wrong state for state machine"; + LOG(ERROR) << "[migrate] Unexpected state for the state machine: " << state_machine_; Clean(); return; } @@ -241,8 +245,7 @@ Status SlotMigrate::Start() { // Get snapshot and sequence slot_snapshot_ = storage_->GetDB()->GetSnapshot(); if (!slot_snapshot_) { - LOG(INFO) << "[migrate] Failed to create snapshot"; - return {Status::NotOK}; + return {Status::NotOK, "failed to create snapshot"}; } wal_begin_seq_ = slot_snapshot_->GetSequenceNumber(); @@ -250,27 +253,27 @@ Status SlotMigrate::Start() { current_pipeline_size_ = 0; last_send_time_ = 0; - // Connect to dst node - auto s = Util::SockConnect(dst_ip_, dst_port_); - if (!s.IsOK()) { - LOG(ERROR) << "[migrate] Failed to connect to destination server: " << s.Msg(); - return {Status::NotOK}; + // Connect to the destination node + auto result = Util::SockConnect(dst_ip_, dst_port_); + if (!result.IsOK()) { + return {Status::NotOK, fmt::format("failed to connect to the destination node: {}", result.Msg())}; } - slot_job_->slot_fd_ = *s; + + slot_job_->slot_fd_ = *result; // Auth first std::string pass = svr_->GetConfig()->requirepass; if (!pass.empty()) { - bool ok = AuthDstServer(slot_job_->slot_fd_, pass); - if (!ok) { - return {Status::NotOK, "Failed to authenticate on destination server"}; + auto s = AuthDstServer(slot_job_->slot_fd_, pass); + if (!s.IsOK()) { + return s.Prefixed("failed to authenticate on destination node"); } } - // Set dst node importing START - if (!SetDstImportStatus(slot_job_->slot_fd_, kImportStart)) { - LOG(ERROR) << "[migrate] Failed to notify the destination to prepare to import data"; - return {Status::NotOK}; + // Set destination node import status to START + auto s = SetDstImportStatus(slot_job_->slot_fd_, kImportStart); + if (!s.IsOK()) { + return s.Prefixed(errFailedToSetImportStatus); } LOG(INFO) << "[migrate] Start migrating slot " << migrate_slot_ << ", connect destination fd " << slot_job_->slot_fd_; @@ -278,7 +281,6 @@ Status SlotMigrate::Start() { } Status SlotMigrate::SendSnapshot() { - // Create DB iter of snapshot uint64_t migrated_key_cnt = 0; uint64_t expired_key_cnt = 0; uint64_t empty_key_cnt = 0; @@ -302,8 +304,7 @@ Status SlotMigrate::SendSnapshot() { // The migrating task has to be stopped, if server role is changed from master to slave // or flush command (flushdb or flushall) is executed if (stop_migrate_) { - LOG(ERROR) << "[migrate] Stop migrating snapshot due to the thread stopped"; - return {Status::NotOK}; + return {Status::NotOK, errMigrationTaskCanceled}; } // Iteration is out of range @@ -316,47 +317,51 @@ Status SlotMigrate::SendSnapshot() { ExtractNamespaceKey(iter->key(), &ns, &user_key, true); current_migrate_key_ = user_key; - // Add key's constructed cmd to restore_cmds, send pipeline - // or not according to current_pipeline_size_ + // Add key's constructed cmd to restore_cmds, send pipeline or not according to current_pipeline_size_ auto result = MigrateOneKey(user_key, iter->value(), &restore_cmds); - if (result == KeyMigrationResult::kMigrated) { + if (!result.IsOK()) { + return {Status::NotOK, fmt::format("failed to migrate a key {}: {}", user_key, result.Msg())}; + } + + if (*result == KeyMigrationResult::kMigrated) { + LOG(INFO) << "[migrate] The key " << user_key << " successfully migrated"; migrated_key_cnt++; - } else if (result == KeyMigrationResult::kExpired) { + } else if (*result == KeyMigrationResult::kExpired) { + LOG(INFO) << "[migrate] The key " << user_key << " is expired"; expired_key_cnt++; - } else if (result == KeyMigrationResult::kUnderlyingStructEmpty) { + } else if (*result == KeyMigrationResult::kUnderlyingStructEmpty) { + LOG(INFO) << "[migrate] The key " << user_key << " has no elements"; empty_key_cnt++; } else { - LOG(ERROR) << "[migrate] Failed to migrate key: " << user_key; + LOG(ERROR) << "[migrate] Migrated a key " << user_key << " with unexpected result: " << static_cast(*result); return {Status::NotOK}; } } - // Send the rest data in pipeline. This operation is necessary, - // the final pipeline may not be sent while iterating keys, - // because its size may less than pipeline_size_limit_. - if (!SendCmdsPipelineIfNeed(&restore_cmds, true)) { - LOG(ERROR) << "[migrate] Failed to send left data in pipeline"; - return {Status::NotOK}; + // It's necessary to send commands that are still in the pipeline since the final pipeline may not be sent + // while iterating keys because its size could be less than pipeline_size_limit_ + auto s = SendCmdsPipelineIfNeed(&restore_cmds, true); + if (!s.IsOK()) { + return s.Prefixed(errFailedToSendCommands); } LOG(INFO) << "[migrate] Succeed to migrate slot snapshot, slot: " << slot << ", Migrated keys: " << migrated_key_cnt << ", Expired keys: " << expired_key_cnt << ", Emtpy keys: " << empty_key_cnt; + return Status::OK(); } Status SlotMigrate::SyncWal() { - // Send incremental data in wal circularly until new increment less than a certain amount + // Send incremental data in WAL circularly until new increment less than a certain amount auto s = SyncWalBeforeForbidSlot(); if (!s.IsOK()) { - LOG(ERROR) << "[migrate] Failed to sync WAL before forbidding slot"; - return {Status::NotOK}; + return s.Prefixed("failed to sync WAL before forbidding a slot"); } // Set forbidden slot, and send last incremental data s = SyncWalAfterForbidSlot(); if (!s.IsOK()) { - LOG(ERROR) << "[migrate] Failed to sync WAL after forbidding slot"; - return {Status::NotOK}; + return s.Prefixed("failed to sync WAL after forbidding a slot"); } return Status::OK(); @@ -364,21 +369,19 @@ Status SlotMigrate::SyncWal() { Status SlotMigrate::Success() { if (stop_migrate_) { - LOG(ERROR) << "[migrate] Stop migrating slot " << migrate_slot_; - return {Status::NotOK}; + return {Status::NotOK, errMigrationTaskCanceled}; } - // Set destination status SUCCESS - if (!SetDstImportStatus(slot_job_->slot_fd_, kImportSuccess)) { - LOG(ERROR) << "[migrate] Failed to notify the destination that data migration succeeded"; - return {Status::NotOK}; + // Set import status on the destination node to SUCCESS + auto s = SetDstImportStatus(slot_job_->slot_fd_, kImportSuccess); + if (!s.IsOK()) { + return s.Prefixed(errFailedToSetImportStatus); } std::string dst_ip_port = dst_ip_ + ":" + std::to_string(dst_port_); - auto s = svr_->cluster_->SetSlotMigrated(migrate_slot_, dst_ip_port); + s = svr_->cluster_->SetSlotMigrated(migrate_slot_, dst_ip_port); if (!s.IsOK()) { - LOG(ERROR) << "[migrate] Failed to set slot, Err:" << s.Msg(); - return {Status::NotOK}; + return s.Prefixed(fmt::format("failed to set slot {} as migrated to {}", migrate_slot_, dst_ip_port)); } migrate_failed_slot_ = -1; @@ -387,18 +390,20 @@ Status SlotMigrate::Success() { } Status SlotMigrate::Fail() { - // Set destination status - if (!SetDstImportStatus(slot_job_->slot_fd_, kImportFailed)) { - LOG(INFO) << "[migrate] Failed to notify the destination that data migration failed"; - } // Stop slot will forbid writing migrate_failed_slot_ = migrate_slot_; forbidden_slot_ = -1; + // Set import status on the destination node to FAILED + auto s = SetDstImportStatus(slot_job_->slot_fd_, kImportFailed); + if (!s.IsOK()) { + return s.Prefixed(errFailedToSetImportStatus); + } + return Status::OK(); } -Status SlotMigrate::Clean() { +void SlotMigrate::Clean() { LOG(INFO) << "[migrate] Clean resources of migrating slot " << migrate_slot_; if (slot_snapshot_) { storage_->GetDB()->ReleaseSnapshot(slot_snapshot_); @@ -413,43 +418,42 @@ Status SlotMigrate::Clean() { slot_job_.reset(); migrate_slot_ = -1; SetMigrateStopFlag(false); - - return Status::OK(); } -bool SlotMigrate::AuthDstServer(int sock_fd, const std::string &password) { +Status SlotMigrate::AuthDstServer(int sock_fd, const std::string &password) { std::string cmd = Redis::MultiBulkString({"auth", password}, false); auto s = Util::SockSend(sock_fd, cmd); if (!s.IsOK()) { - LOG(ERROR) << "[migrate] Failed to send auth command to destination, slot: " << migrate_slot_ - << ", error: " << s.Msg(); - return false; + return s.Prefixed("failed to send AUTH command"); } - if (!CheckResponseOnce(sock_fd)) { - LOG(ERROR) << "[migrate] Failed to auth destination server with '" << password << "', stop migrating slot " - << migrate_slot_; - return false; + s = CheckResponseOnce(sock_fd); + if (!s.IsOK()) { + return s.Prefixed("failed to check the response of AUTH command"); } - return true; + return Status::OK(); } -bool SlotMigrate::SetDstImportStatus(int sock_fd, int status) { - if (sock_fd <= 0) return false; +Status SlotMigrate::SetDstImportStatus(int sock_fd, int status) { + if (sock_fd <= 0) return {Status::NotOK, "invalid socket descriptor"}; - int slot = migrate_slot_; - std::string cmd = Redis::MultiBulkString({"cluster", "import", std::to_string(slot), std::to_string(status)}); + std::string cmd = + Redis::MultiBulkString({"cluster", "import", std::to_string(migrate_slot_), std::to_string(status)}); auto s = Util::SockSend(sock_fd, cmd); if (!s.IsOK()) { - LOG(ERROR) << "[migrate] Failed to send import command to destination, slot: " << slot << ", error: " << s.Msg(); - return false; + return s.Prefixed("failed to send command to the destination node"); } - return CheckResponseOnce(sock_fd); + s = CheckResponseOnce(sock_fd); + if (!s.IsOK()) { + return s.Prefixed("failed to check the response from the destination node"); + } + + return Status::OK(); } -bool SlotMigrate::CheckResponseOnce(int sock_fd) { return CheckResponseWithCounts(sock_fd, 1); } +Status SlotMigrate::CheckResponseOnce(int sock_fd) { return CheckResponseWithCounts(sock_fd, 1); } // Commands | Response | Instance // ++++++++++++++++++++++++++++++++++++++++ @@ -476,10 +480,9 @@ bool SlotMigrate::CheckResponseOnce(int sock_fd) { return CheckResponseWithCount // sirem Redis::Integer // del Redis::Integer // xadd Redis::BulkString -bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) { +Status SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) { if (sock_fd < 0 || total <= 0) { - LOG(INFO) << "[migrate] Invalid args, sock_fd: " << sock_fd << ", count: " << total; - return false; + return {Status::NotOK, fmt::format("invalid arguments: sock_fd={}, count={}", sock_fd, total)}; } // Set socket receive timeout first @@ -496,8 +499,7 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) { while (true) { // Read response data from socket buffer to the event buffer if (evbuffer_read(evbuf.get(), sock_fd, -1) <= 0) { - LOG(ERROR) << "[migrate] Failed to read response, Err: " << strerror(errno); - return false; + return {Status::NotOK, fmt::format("failed to read response: {}", strerror(errno))}; } // Parse response data in event buffer @@ -514,22 +516,19 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) { } if (line[0] == '-') { - LOG(ERROR) << "[migrate] Got invalid response: " << line.get() << ", line length: " << line.length; - parser_state_ = ParserState::Error; + return {Status::NotOK, fmt::format("got invalid response of length {}: {}", line.length, line.get())}; } else if (line[0] == '$') { auto parse_result = ParseInt(std::string(line.get() + 1, line.length - 1), 10); if (!parse_result) { - LOG(ERROR) << "[migrate] Protocol Err: expect integer"; - parser_state_ = ParserState::Error; - } else { - bulk_len = *parse_result; - parser_state_ = bulk_len > 0 ? ParserState::BulkData : ParserState::OneRspEnd; + return {Status::NotOK, "protocol error: expected integer value"}; } + + bulk_len = *parse_result; + parser_state_ = bulk_len > 0 ? ParserState::BulkData : ParserState::OneRspEnd; } else if (line[0] == '+' || line[0] == ':') { parser_state_ = ParserState::OneRspEnd; } else { - LOG(ERROR) << "[migrate] Unexpected response: " << line.get(); - parser_state_ = ParserState::Error; + return {Status::NotOK, fmt::format("got unexpected response of length {}: {}", line.length, line.get())}; } break; @@ -550,14 +549,12 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) { case ParserState::OneRspEnd: { cnt++; if (cnt >= total) { - return true; + return Status::OK(); } + parser_state_ = ParserState::ArrayLen; break; } - case ParserState::Error: { - return false; - } default: break; } @@ -565,15 +562,14 @@ bool SlotMigrate::CheckResponseWithCounts(int sock_fd, int total) { } } -KeyMigrationResult SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata, - std::string *restore_cmds) { - std::string prefix_key; - AppendNamespacePrefix(key, &prefix_key); +StatusOr SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, + const rocksdb::Slice &encoded_metadata, + std::string *restore_cmds) { std::string bytes = encoded_metadata.ToString(); Metadata metadata(kRedisNone, false); metadata.Decode(bytes); + if (metadata.Type() != kRedisString && metadata.Type() != kRedisStream && metadata.size == 0) { - LOG(INFO) << "[migrate] No elements of key: " << prefix_key; return KeyMigrationResult::kUnderlyingStructEmpty; } @@ -584,10 +580,9 @@ KeyMigrationResult SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const r // Construct command according to type of the key switch (metadata.Type()) { case kRedisString: { - bool ok = MigrateSimpleKey(key, metadata, bytes, restore_cmds); - if (!ok) { - LOG(ERROR) << "[migrate] Failed to migrate simple key: " << key.ToString(); - return KeyMigrationResult::kError; + auto s = MigrateSimpleKey(key, metadata, bytes, restore_cmds); + if (!s.IsOK()) { + return s.Prefixed("failed to migrate simple key"); } break; } @@ -597,10 +592,9 @@ KeyMigrationResult SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const r case kRedisHash: case kRedisSet: case kRedisSortedint: { - bool ok = MigrateComplexKey(key, metadata, restore_cmds); - if (!ok) { - LOG(ERROR) << "[migrate] Failed to migrate complex key: " << key.ToString(); - return KeyMigrationResult::kError; + auto s = MigrateComplexKey(key, metadata, restore_cmds); + if (!s.IsOK()) { + return s.Prefixed("failed to migrate complex key"); } break; } @@ -608,10 +602,9 @@ KeyMigrationResult SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const r StreamMetadata stream_md(false); stream_md.Decode(bytes); - bool ok = MigrateStream(key, stream_md, restore_cmds); - if (!ok) { - LOG(ERROR) << "[migrate] Failed to migrate stream: " << key.ToString(); - return KeyMigrationResult::kError; + auto s = MigrateStream(key, stream_md, restore_cmds); + if (!s.IsOK()) { + return s.Prefixed("failed to migrate stream key"); } break; } @@ -622,8 +615,8 @@ KeyMigrationResult SlotMigrate::MigrateOneKey(const rocksdb::Slice &key, const r return KeyMigrationResult::kMigrated; } -bool SlotMigrate::MigrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes, - std::string *restore_cmds) { +Status SlotMigrate::MigrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes, + std::string *restore_cmds) { std::vector command = {"set", key.ToString(), bytes.substr(Redis::STRING_HDR_SIZE, bytes.size() - Redis::STRING_HDR_SIZE)}; if (metadata.expire > 0) { @@ -635,15 +628,15 @@ bool SlotMigrate::MigrateSimpleKey(const rocksdb::Slice &key, const Metadata &me // Check whether pipeline needs to be sent // TODO(chrisZMF): Resend data if failed to send data - if (!SendCmdsPipelineIfNeed(restore_cmds, false)) { - LOG(ERROR) << "[migrate] Failed to send simple key"; - return false; + auto s = SendCmdsPipelineIfNeed(restore_cmds, false); + if (!s.IsOK()) { + return s.Prefixed(errFailedToSendCommands); } - return true; + return Status::OK(); } -bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &metadata, std::string *restore_cmds) { +Status SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &metadata, std::string *restore_cmds) { std::string cmd; cmd = type_to_cmd[metadata.Type()]; @@ -661,8 +654,7 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m for (iter->Seek(prefix_subkey); iter->Valid(); iter->Next()) { if (stop_migrate_) { - LOG(ERROR) << "[migrate] Stop migrating complex key due to task stopped"; - return false; + return {Status::NotOK, errMigrationTaskCanceled}; } if (!iter->key().starts_with(prefix_subkey)) { @@ -670,8 +662,7 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m } // Parse values of the complex key - // InternalKey is adopted to get complex key's value - // from the formatted key return by iterator of rocksdb + // InternalKey is adopted to get complex key's value from the formatted key return by iterator of rocksdb InternalKey inkey(iter->key(), true); switch (metadata.Type()) { case kRedisSet: { @@ -690,7 +681,10 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m break; } case kRedisBitmap: { - if (!MigrateBitmapKey(inkey, &iter, &user_cmd, restore_cmds)) return false; + auto s = MigrateBitmapKey(inkey, &iter, &user_cmd, restore_cmds); + if (!s.IsOK()) { + return s.Prefixed("failed to migrate bitmap key"); + } break; } case kRedisHash: { @@ -717,10 +711,10 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m // Have to clear saved items user_cmd.erase(user_cmd.begin() + 2, user_cmd.end()); - // Maybe key has amounted of elements, have to check pipeline here - if (!SendCmdsPipelineIfNeed(restore_cmds, false)) { - LOG(INFO) << "[migrate] Failed to send complex key part"; - return false; + // Send commands if the pipeline contains enough of them + auto s = SendCmdsPipelineIfNeed(restore_cmds, false); + if (!s.IsOK()) { + return s.Prefixed(errFailedToSendCommands); } } } @@ -738,16 +732,16 @@ bool SlotMigrate::MigrateComplexKey(const rocksdb::Slice &key, const Metadata &m current_pipeline_size_++; } - // Check whether pipeline needs to send - if (!SendCmdsPipelineIfNeed(restore_cmds, false)) { - LOG(INFO) << "[migrate] Failed to send complex key"; - return false; + // Send commands if the pipeline contains enough of them + auto s = SendCmdsPipelineIfNeed(restore_cmds, false); + if (!s.IsOK()) { + return s.Prefixed(errFailedToSendCommands); } - return true; + return Status::OK(); } -bool SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metadata, std::string *restore_cmds) { +Status SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metadata, std::string *restore_cmds) { rocksdb::ReadOptions read_options; read_options.snapshot = slot_snapshot_; read_options.fill_cache = false; @@ -764,8 +758,7 @@ bool SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metadata for (iter->Seek(prefix_key); iter->Valid(); iter->Next()) { if (stop_migrate_) { - LOG(ERROR) << "[migrate] Stop migrating stream due to task cancellation"; - return false; + return {Status::NotOK, errMigrationTaskCanceled}; } if (!iter->key().starts_with(prefix_key)) { @@ -778,8 +771,7 @@ bool SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metadata std::vector values; auto s = Redis::DecodeRawStreamEntryValue(iter->value().ToString(), &values); if (!s.IsOK()) { - LOG(ERROR) << "[migrate] Failed to decode stream values: " << s.Msg(); - return false; + return {Status::NotOK, fmt::format("failed to decode stream values: {}", s.Msg())}; } Slice encoded_id = inkey.GetSubKey(); @@ -795,9 +787,9 @@ bool SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metadata user_cmd.erase(user_cmd.begin() + 2, user_cmd.end()); - if (!SendCmdsPipelineIfNeed(restore_cmds, false)) { - LOG(INFO) << "[migrate] Failed to send the part of stream restoring commands"; - return false; + s = SendCmdsPipelineIfNeed(restore_cmds, false); + if (!s.IsOK()) { + return s.Prefixed(errFailedToSendCommands); } } @@ -815,28 +807,24 @@ bool SlotMigrate::MigrateStream(const Slice &key, const StreamMetadata &metadata current_pipeline_size_++; } - // Check whether commands from the pipeline need to be sent - if (!SendCmdsPipelineIfNeed(restore_cmds, false)) { - LOG(INFO) << "[migrate] Failed to send stream commands"; - return false; + auto s = SendCmdsPipelineIfNeed(restore_cmds, false); + if (!s.IsOK()) { + return s.Prefixed(errFailedToSendCommands); } - return true; + return Status::OK(); } -bool SlotMigrate::MigrateBitmapKey(const InternalKey &inkey, std::unique_ptr *iter, - std::vector *user_cmd, std::string *restore_cmds) { - uint32_t index = 0; - uint32_t offset = 0; +Status SlotMigrate::MigrateBitmapKey(const InternalKey &inkey, std::unique_ptr *iter, + std::vector *user_cmd, std::string *restore_cmds) { std::string index_str = inkey.GetSubKey().ToString(); std::string fragment = (*iter)->value().ToString(); auto parse_result = ParseInt(index_str, 10); if (!parse_result) { - LOG(ERROR) << "[migrate] Parse bitmap index error, Err: " << strerror(errno); - return false; + return {Status::RedisParseErr, "index is not a valid integer"}; } - index = *parse_result; + uint32_t index = *parse_result; // Bitmap does not have hmset-like command // TODO(chrisZMF): Use hmset-like command for efficiency @@ -844,7 +832,7 @@ bool SlotMigrate::MigrateBitmapKey(const InternalKey &inkey, std::unique_ptremplace_back(std::to_string(offset)); user_cmd->emplace_back("1"); *restore_cmds += Redis::MultiBulkString(*user_cmd, false); @@ -853,58 +841,50 @@ bool SlotMigrate::MigrateBitmapKey(const InternalKey &inkey, std::unique_ptrslot_fd_, *commands); if (!s.IsOK()) { - LOG(ERROR) << "[migrate] Failed to send commands, Err: " << s.Msg(); - return false; + return s.Prefixed("failed to write data to a socket"); } last_send_time_ = Util::GetTimeStampUS(); - // Check response - bool ok = CheckResponseWithCounts(slot_job_->slot_fd_, current_pipeline_size_); - if (!ok) { - LOG(ERROR) << "[migrate] Wrong response"; - return false; + s = CheckResponseWithCounts(slot_job_->slot_fd_, current_pipeline_size_); + if (!s.IsOK()) { + return s.Prefixed("wrong response from the destination node"); } // Clear commands and running pipeline commands->clear(); current_pipeline_size_ = 0; - return true; + return Status::OK(); } void SlotMigrate::SetForbiddenSlot(int16_t slot) { @@ -917,7 +897,7 @@ void SlotMigrate::ReleaseForbiddenSlot() { forbidden_slot_ = -1; } -void SlotMigrate::MigrateSpeedLimit() { +void SlotMigrate::ApplyMigrationSpeedLimit() { if (migration_speed_ > 0) { uint64_t current_time = Util::GetTimeStampUS(); uint64_t per_request_time = 1000000 * pipeline_size_limit_ / migration_speed_; @@ -983,7 +963,8 @@ Status SlotMigrate::MigrateIncrementData(std::unique_ptr *iter, - std::vector *user_cmd, std::string *restore_cmds); - bool SendCmdsPipelineIfNeed(std::string *commands, bool need); - void MigrateSpeedLimit(); + void Clean(); + + Status AuthDstServer(int sock_fd, const std::string &password); + Status SetDstImportStatus(int sock_fd, int status); + Status CheckResponseOnce(int sock_fd); + Status CheckResponseWithCounts(int sock_fd, int total); + + StatusOr MigrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata, + std::string *restore_cmds); + Status MigrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes, + std::string *restore_cmds); + Status MigrateComplexKey(const rocksdb::Slice &key, const Metadata &metadata, std::string *restore_cmds); + Status MigrateStream(const rocksdb::Slice &key, const StreamMetadata &metadata, std::string *restore_cmds); + Status MigrateBitmapKey(const InternalKey &inkey, std::unique_ptr *iter, + std::vector *user_cmd, std::string *restore_cmds); + Status SendCmdsPipelineIfNeed(std::string *commands, bool need); + void ApplyMigrationSpeedLimit(); Status GenerateCmdsFromBatch(rocksdb::BatchResult *batch, std::string *commands); Status MigrateIncrementData(std::unique_ptr *iter, uint64_t end_seq); Status SyncWalBeforeForbidSlot(); @@ -142,7 +142,7 @@ class SlotMigrate : public Redis::Database { void SetForbiddenSlot(int16_t slot); private: - enum class ParserState { ArrayLen, BulkLen, BulkData, Error, OneRspEnd }; + enum class ParserState { ArrayLen, BulkLen, BulkData, OneRspEnd }; enum class ThreadState { Uninitialized, Running, Terminated }; static const size_t kProtoInlineMaxSize = 16 * 1024L; @@ -150,7 +150,7 @@ class SlotMigrate : public Redis::Database { static const int kMaxNotifyRetryTimes = 3; static const int kDefaultPipelineSizeLimit = 16; static const int kDefaultMigrationSpeed = 4096; - static const int kMaxItemsInCommand = 16; // Items in every write command of complex keys + static const int kMaxItemsInCommand = 16; // number of items in every write command of complex keys static const int kDefaultSeqGapLimit = 10000; static const int kMaxLoopTimes = 10; @@ -171,15 +171,15 @@ class SlotMigrate : public Redis::Database { std::unique_ptr slot_job_; std::string dst_node_; std::string dst_ip_; - int dst_port_; - std::atomic forbidden_slot_; - std::atomic migrate_slot_; - int16_t migrate_failed_slot_; - std::atomic migrate_state_; - std::atomic stop_migrate_; // stop_migrate_ is true will stop migrate but the migration thread won't destroy. + int dst_port_ = -1; + std::atomic forbidden_slot_ = -1; + std::atomic migrate_slot_ = -1; + int16_t migrate_failed_slot_ = -1; + std::atomic migrate_state_ = kMigrateNone; + std::atomic stop_migrate_ = false; // if is true migration will be stopped but the thread won't be destroyed std::string current_migrate_key_; uint64_t slot_snapshot_time_ = 0; - const rocksdb::Snapshot *slot_snapshot_; + const rocksdb::Snapshot *slot_snapshot_ = nullptr; uint64_t wal_begin_seq_ = 0; uint64_t wal_increment_seq_ = 0; };