From 0121cb2a5a2bae882d074f71689f1ce448ea58ea Mon Sep 17 00:00:00 2001 From: dangleptr <37216992+dangleptr@users.noreply.github.com> Date: Tue, 3 Sep 2019 17:44:13 +0800 Subject: [PATCH] Implement snapshot logic (#795) * Implement snapshot logic * Address critical27's and wadeliuyi's comments * Address darion's and wadeliuyi's comments, rebase on master and fix failed UTs * Address wadeliuyi's comments * Fix failed UT leader_transfer_test * Open disabled UTs in #859 * Add some checks when receiving snapshot and adjust some log levels --- src/common/base/NebulaKeyUtils.cpp | 8 + src/common/base/NebulaKeyUtils.h | 2 + src/interface/raftex.thrift | 26 ++- src/kvstore/CMakeLists.txt | 1 + src/kvstore/LogEncoder.cpp | 20 ++ src/kvstore/LogEncoder.h | 4 + src/kvstore/NebulaStore.cpp | 7 +- src/kvstore/NebulaStore.h | 2 + src/kvstore/Part.cpp | 68 ++++-- src/kvstore/Part.h | 16 +- src/kvstore/PartManager.h | 2 +- src/kvstore/RocksEngine.cpp | 4 +- src/kvstore/SnapshotManagerImpl.cpp | 48 ++++ src/kvstore/SnapshotManagerImpl.h | 33 +++ src/kvstore/raftex/CMakeLists.txt | 1 + src/kvstore/raftex/Host.cpp | 133 +++++++---- src/kvstore/raftex/Host.h | 9 +- src/kvstore/raftex/RaftPart.cpp | 209 ++++++++++++++---- src/kvstore/raftex/RaftPart.h | 35 ++- src/kvstore/raftex/RaftexService.cpp | 13 ++ src/kvstore/raftex/RaftexService.h | 10 +- src/kvstore/raftex/SnapshotManager.cpp | 118 ++++++++++ src/kvstore/raftex/SnapshotManager.h | 65 ++++++ src/kvstore/raftex/test/CMakeLists.txt | 6 + .../raftex/test/LeaderElectionTest.cpp | 1 + .../raftex/test/LeaderTransferTest.cpp | 6 +- src/kvstore/raftex/test/LogCommandTest.cpp | 2 +- src/kvstore/raftex/test/RaftexTestBase.cpp | 12 + src/kvstore/raftex/test/RaftexTestBase.h | 6 +- src/kvstore/raftex/test/SnapshotTest.cpp | 104 +++++++++ src/kvstore/raftex/test/TestShard.cpp | 46 +++- src/kvstore/raftex/test/TestShard.h | 62 +++++- src/kvstore/test/LogEncoderTest.cpp | 7 + src/kvstore/test/NebulaStoreTest.cpp | 15 +- src/kvstore/wal/FileBasedWal.cpp | 32 +-- src/kvstore/wal/FileBasedWal.h | 10 +- src/kvstore/wal/FileBasedWalIterator.cpp | 6 +- src/kvstore/wal/Wal.h | 2 +- src/kvstore/wal/test/FileBasedWalTest.cpp | 13 +- src/meta/processors/admin/Balancer.h | 2 +- src/meta/test/BalanceIntegrationTest.cpp | 16 +- src/storage/AdminProcessor.h | 15 +- src/storage/BaseProcessor.h | 7 + 43 files changed, 1037 insertions(+), 167 deletions(-) create mode 100644 src/kvstore/SnapshotManagerImpl.cpp create mode 100644 src/kvstore/SnapshotManagerImpl.h create mode 100644 src/kvstore/raftex/SnapshotManager.cpp create mode 100644 src/kvstore/raftex/SnapshotManager.h create mode 100644 src/kvstore/raftex/test/SnapshotTest.cpp diff --git a/src/common/base/NebulaKeyUtils.cpp b/src/common/base/NebulaKeyUtils.cpp index 47846787c12..8e54c3d483e 100644 --- a/src/common/base/NebulaKeyUtils.cpp +++ b/src/common/base/NebulaKeyUtils.cpp @@ -38,6 +38,14 @@ std::string NebulaKeyUtils::edgeKey(PartitionID partId, return key; } +// static +std::string NebulaKeyUtils::prefix(PartitionID partId) { + std::string key; + key.reserve(sizeof(PartitionID)); + key.append(reinterpret_cast(&partId), sizeof(PartitionID)); + return key; +} + // static std::string NebulaKeyUtils::prefix(PartitionID partId, VertexID srcId, EdgeType type) { std::string key; diff --git a/src/common/base/NebulaKeyUtils.h b/src/common/base/NebulaKeyUtils.h index 0e8d6ec486f..5489a689fc9 100644 --- a/src/common/base/NebulaKeyUtils.h +++ b/src/common/base/NebulaKeyUtils.h @@ -52,6 +52,8 @@ class NebulaKeyUtils final { static std::string prefix(PartitionID partId, VertexID src, EdgeType type, EdgeRanking ranking, VertexID dst); + static std::string prefix(PartitionID partId); + static bool isVertex(const folly::StringPiece& rawKey) { return rawKey.size() == kVertexLen; } diff --git a/src/interface/raftex.thrift b/src/interface/raftex.thrift index 987fdd30b27..be20f364250 100644 --- a/src/interface/raftex.thrift +++ b/src/interface/raftex.thrift @@ -16,7 +16,7 @@ enum ErrorCode { E_LOG_GAP = -1; E_LOG_STALE = -2; E_MISSING_COMMIT = -3; - E_PULLING_SNAPSHOT = -4; // The follower is pulling a snapshot + E_WAITING_SNAPSHOT = -4; // The follower is waiting a snapshot E_UNKNOWN_PART = -5; E_TERM_OUT_OF_DATE = -6; @@ -31,6 +31,7 @@ enum ErrorCode { E_NOT_A_LEADER = -13; E_HOST_DISCONNECTED = -14; E_TOO_MANY_REQUESTS = -15; + E_PERSIST_SNAPSHOT_FAILED = -16; E_EXCEPTION = -20; // An thrift internal exception was thrown } @@ -94,8 +95,6 @@ struct AppendLogRequest { // // Fields 10 to 11 are used for LogAppend. // - // In the case of heartbeat, the log_str_list will be empty, - // and log_term == 0 // // In the case of LogAppend, the id of the first log is the // last_log_id_sent + 1 @@ -106,7 +105,7 @@ struct AppendLogRequest { 10: TermID log_term; 11: list log_str_list; - 12: optional binary snapshot_uri; // Snapshot URL + 12: bool sending_snapshot; } @@ -118,13 +117,30 @@ struct AppendLogResponse { 5: LogID committed_log_id; 6: LogID last_log_id; 7: TermID last_log_term; - 8: bool pulling_snapshot; } +struct SendSnapshotRequest { + 1: GraphSpaceID space; + 2: PartitionID part; + 3: TermID term; + 4: LogID committed_log_id; + 5: TermID committed_log_term; + 6: IPv4 leader_ip; + 7: Port leader_port; + 8: list rows; + 9: i64 total_size; + 10: i64 total_count; + 11: bool done; +} + +struct SendSnapshotResponse { + 1: ErrorCode error_code; +} service RaftexService { AskForVoteResponse askForVote(1: AskForVoteRequest req); AppendLogResponse appendLog(1: AppendLogRequest req); + SendSnapshotResponse sendSnapshot(1: SendSnapshotRequest req); } diff --git a/src/kvstore/CMakeLists.txt b/src/kvstore/CMakeLists.txt index bc6453dd77d..f8fdd8e73bc 100644 --- a/src/kvstore/CMakeLists.txt +++ b/src/kvstore/CMakeLists.txt @@ -6,6 +6,7 @@ nebula_add_library( NebulaStore.cpp RocksEngineConfig.cpp LogEncoder.cpp + SnapshotManagerImpl.cpp ) add_subdirectory(raftex) diff --git a/src/kvstore/LogEncoder.cpp b/src/kvstore/LogEncoder.cpp index c4324feaec1..63003e8cba7 100644 --- a/src/kvstore/LogEncoder.cpp +++ b/src/kvstore/LogEncoder.cpp @@ -13,6 +13,26 @@ namespace kvstore { constexpr auto kHeadLen = sizeof(int64_t) + 1 + sizeof(uint32_t); +std::string encodeKV(const folly::StringPiece& key, + const folly::StringPiece& val) { + uint32_t ksize = key.size(); + uint32_t vsize = val.size(); + std::string str; + str.reserve(sizeof(uint32_t) * 2 + ksize + vsize); + str.append(reinterpret_cast(&ksize), sizeof(ksize)); + str.append(reinterpret_cast(&vsize), sizeof(vsize)); + str.append(key.data(), ksize); + str.append(val.data(), vsize); + return str; +} + +std::pair decodeKV(const std::string& data) { + auto ksize = *reinterpret_cast(data.data()); + auto vsize = *reinterpret_cast(data.data() + sizeof(ksize)); + auto key = folly::StringPiece(data.data() + sizeof(ksize) + sizeof(vsize), ksize); + auto val = folly::StringPiece(data.data() + sizeof(ksize) + sizeof(vsize) + ksize, vsize); + return std::make_pair(key, val); +} std::string encodeSingleValue(LogType type, folly::StringPiece val) { std::string encoded; diff --git a/src/kvstore/LogEncoder.h b/src/kvstore/LogEncoder.h index e3232761f45..0589acfbaf3 100644 --- a/src/kvstore/LogEncoder.h +++ b/src/kvstore/LogEncoder.h @@ -23,6 +23,10 @@ enum LogType : char { OP_TRANS_LEADER = 0x08, }; +std::string encodeKV(const folly::StringPiece& key, + const folly::StringPiece& val); + +std::pair decodeKV(const std::string& data); std::string encodeSingleValue(LogType type, folly::StringPiece val); folly::StringPiece decodeSingleValue(folly::StringPiece encoded); diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index d3fcd58235e..f2ff5993bd7 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -12,6 +12,7 @@ #include "network/NetworkUtils.h" #include "fs/FileUtils.h" #include "kvstore/RocksEngine.h" +#include "kvstore/SnapshotManagerImpl.h" DEFINE_string(engine_type, "rocksdb", "rocksdb, memory..."); DEFINE_int32(custom_filter_interval_secs, 24 * 3600, "interval to trigger custom compaction"); @@ -36,7 +37,8 @@ NebulaStore::~NebulaStore() { bool NebulaStore::init() { LOG(INFO) << "Start the raft service..."; bgWorkers_ = std::make_shared(); - bgWorkers_->start(FLAGS_num_workers); + bgWorkers_->start(FLAGS_num_workers, "nebula-bgworkers"); + snapshot_.reset(new SnapshotManagerImpl(this)); raftService_ = raftex::RaftexService::createService(ioPool_, workers_, raftAddr_.second); @@ -206,7 +208,8 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, engine, ioPool_, bgWorkers_, - workers_); + workers_, + snapshot_); auto partMeta = options_.partMan_->partMeta(spaceId, partId); std::vector peers; for (auto& h : partMeta.peers_) { diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 346bacb350d..e9add0380bf 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -15,6 +15,7 @@ #include "kvstore/PartManager.h" #include "kvstore/Part.h" #include "kvstore/KVEngine.h" +#include "kvstore/raftex/SnapshotManager.h" namespace nebula { namespace kvstore { @@ -200,6 +201,7 @@ class NebulaStore : public KVStore, public Handler { KVOptions options_; std::shared_ptr raftService_; + std::shared_ptr snapshot_; }; } // namespace kvstore diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index cd77387d596..ea3d6c5eca3 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -39,7 +39,8 @@ Part::Part(GraphSpaceID spaceId, KVEngine* engine, std::shared_ptr ioPool, std::shared_ptr workers, - std::shared_ptr handlers) + std::shared_ptr handlers, + std::shared_ptr snapshotMan) : RaftPart(FLAGS_cluster_id, spaceId, partId, @@ -47,7 +48,8 @@ Part::Part(GraphSpaceID spaceId, walPath, ioPool, workers, - handlers) + handlers, + snapshotMan) , spaceId_(spaceId) , partId_(partId) , walPath_(walPath) @@ -192,7 +194,7 @@ bool Part::commitLogs(std::unique_ptr iter) { auto pieces = decodeMultiValues(log); DCHECK_EQ(2, pieces.size()); if (batch->put(pieces[0], pieces[1]) != ResultCode::SUCCEEDED) { - LOG(ERROR) << "Failed to call WriteBatch::put()"; + LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()"; return false; } break; @@ -203,7 +205,7 @@ bool Part::commitLogs(std::unique_ptr iter) { DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2); for (size_t i = 0; i < kvs.size(); i += 2) { if (batch->put(kvs[i], kvs[i + 1]) != ResultCode::SUCCEEDED) { - LOG(ERROR) << "Failed to call WriteBatch::put()"; + LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()"; return false; } } @@ -212,7 +214,7 @@ bool Part::commitLogs(std::unique_ptr iter) { case OP_REMOVE: { auto key = decodeSingleValue(log); if (batch->remove(key) != ResultCode::SUCCEEDED) { - LOG(ERROR) << "Failed to call WriteBatch::remove()"; + LOG(ERROR) << idStr_ << "Failed to call WriteBatch::remove()"; return false; } break; @@ -221,7 +223,7 @@ bool Part::commitLogs(std::unique_ptr iter) { auto keys = decodeMultiValues(log); for (auto k : keys) { if (batch->remove(k) != ResultCode::SUCCEEDED) { - LOG(ERROR) << "Failed to call WriteBatch::remove()"; + LOG(ERROR) << idStr_ << "Failed to call WriteBatch::remove()"; return false; } } @@ -230,7 +232,7 @@ bool Part::commitLogs(std::unique_ptr iter) { case OP_REMOVE_PREFIX: { auto prefix = decodeSingleValue(log); if (batch->removePrefix(prefix) != ResultCode::SUCCEEDED) { - LOG(ERROR) << "Failed to call WriteBatch::removePrefix()"; + LOG(ERROR) << idStr_ << "Failed to call WriteBatch::removePrefix()"; return false; } break; @@ -239,7 +241,7 @@ bool Part::commitLogs(std::unique_ptr iter) { auto range = decodeMultiValues(log); DCHECK_EQ(2, range.size()); if (batch->removeRange(range[0], range[1]) != ResultCode::SUCCEEDED) { - LOG(ERROR) << "Failed to call WriteBatch::removeRange()"; + LOG(ERROR) << idStr_ << "Failed to call WriteBatch::removeRange()"; return false; } break; @@ -254,7 +256,7 @@ bool Part::commitLogs(std::unique_ptr iter) { break; } default: { - LOG(FATAL) << "Unknown operation: " << static_cast(log[0]); + LOG(FATAL) << idStr_ << "Unknown operation: " << static_cast(log[0]); } } @@ -262,16 +264,52 @@ bool Part::commitLogs(std::unique_ptr iter) { } if (lastId >= 0) { - std::string commitMsg; - commitMsg.reserve(sizeof(LogID) + sizeof(TermID)); - commitMsg.append(reinterpret_cast(&lastId), sizeof(LogID)); - commitMsg.append(reinterpret_cast(&lastTerm), sizeof(TermID)); - batch->put(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_), commitMsg); + if (putCommitMsg(batch.get(), lastId, lastTerm) != ResultCode::SUCCEEDED) { + LOG(ERROR) << idStr_ << "Commit msg failed"; + return false; + } } - return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED; } +std::pair Part::commitSnapshot(const std::vector& rows, + LogID committedLogId, + TermID committedLogTerm, + bool finished) { + auto batch = engine_->startBatchWrite(); + int64_t count = 0; + int64_t size = 0; + for (auto& row : rows) { + count++; + size += row.size(); + auto kv = decodeKV(row); + if (ResultCode::SUCCEEDED != batch->put(kv.first, kv.second)) { + LOG(ERROR) << idStr_ << "Put failed in commit"; + return std::make_pair(0, 0); + } + } + if (finished) { + if (ResultCode::SUCCEEDED != putCommitMsg(batch.get(), committedLogId, committedLogTerm)) { + LOG(ERROR) << idStr_ << "Put failed in commit"; + return std::make_pair(0, 0); + } + } + if (ResultCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch))) { + LOG(ERROR) << idStr_ << "Put failed in commit"; + return std::make_pair(0, 0); + } + return std::make_pair(count, size); +} + +ResultCode Part::putCommitMsg(WriteBatch* batch, LogID committedLogId, TermID committedLogTerm) { + std::string commitMsg; + commitMsg.reserve(sizeof(LogID) + sizeof(TermID)); + commitMsg.append(reinterpret_cast(&committedLogId), sizeof(LogID)); + commitMsg.append(reinterpret_cast(&committedLogTerm), sizeof(TermID)); + return batch->put(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_), + commitMsg); +} + bool Part::preProcessLog(LogID logId, TermID termId, ClusterID clusterId, diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index f9dd5526734..e41501f4918 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -11,12 +11,14 @@ #include "raftex/RaftPart.h" #include "kvstore/Common.h" #include "kvstore/KVEngine.h" +#include "kvstore/raftex/SnapshotManager.h" namespace nebula { namespace kvstore { class Part : public raftex::RaftPart { + friend class SnapshotManager; public: Part(GraphSpaceID spaceId, PartitionID partId, @@ -25,7 +27,8 @@ class Part : public raftex::RaftPart { KVEngine* engine, std::shared_ptr pool, std::shared_ptr workers, - std::shared_ptr handlers); + std::shared_ptr handlers, + std::shared_ptr snapshotMan); virtual ~Part() { LOG(INFO) << idStr_ << "~Part()"; @@ -78,6 +81,17 @@ class Part : public raftex::RaftPart { ClusterID clusterId, const std::string& log) override; + std::pair commitSnapshot(const std::vector& data, + LogID committedLogId, + TermID committedLogTerm, + bool finished) override; + + ResultCode putCommitMsg(WriteBatch* batch, LogID committedLogId, TermID committedLogTerm); + + void cleanup() override { + LOG(INFO) << idStr_ << "Clean up all data, not implement!"; + } + protected: GraphSpaceID spaceId_; PartitionID partId_; diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index 9bc05c2a75e..0f0d2715e09 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -72,7 +72,7 @@ class MemPartManager final : public PartManager { FRIEND_TEST(NebulaStoreTest, SimpleTest); FRIEND_TEST(NebulaStoreTest, PartsTest); FRIEND_TEST(NebulaStoreTest, ThreeCopiesTest); - FRIEND_TEST(NebulaStoreTest, DISABLED_TransLeaderTest); + FRIEND_TEST(NebulaStoreTest, TransLeaderTest); public: MemPartManager() = default; diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index a843d21ded9..4cc233f1551 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -32,7 +32,7 @@ class RocksWriteBatch : public WriteBatch { rocksdb::DB* db_{nullptr}; public: - explicit RocksWriteBatch(rocksdb::DB* db) : db_(db) {} + explicit RocksWriteBatch(rocksdb::DB* db) : batch_(FLAGS_rocksdb_batch_size), db_(db) {} virtual ~RocksWriteBatch() = default; @@ -116,7 +116,7 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId, options.compaction_filter_factory = cfFactory; } status = rocksdb::DB::Open(options, path, &db); - CHECK(status.ok()); + CHECK(status.ok()) << status.ToString(); db_.reset(db); partsNum_ = allParts().size(); } diff --git a/src/kvstore/SnapshotManagerImpl.cpp b/src/kvstore/SnapshotManagerImpl.cpp new file mode 100644 index 00000000000..33d80a1cc22 --- /dev/null +++ b/src/kvstore/SnapshotManagerImpl.cpp @@ -0,0 +1,48 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ +#include "kvstore/SnapshotManagerImpl.h" +#include "base/NebulaKeyUtils.h" +#include "kvstore/LogEncoder.h" + +DEFINE_int32(snapshot_batch_size, 1024 * 1024 * 10, "batch size for snapshot"); + +namespace nebula { +namespace kvstore { + +void SnapshotManagerImpl::accessAllRowsInSnapshot(GraphSpaceID spaceId, + PartitionID partId, + raftex::SnapshotCallback cb) { + CHECK_NOTNULL(store_); + std::unique_ptr iter; + auto prefix = NebulaKeyUtils::prefix(partId); + store_->prefix(spaceId, partId, prefix, &iter); + std::vector data; + data.reserve(1024); + int32_t batchSize = 0; + int64_t totalSize = 0; + int64_t totalCount = 0; + while (iter->valid()) { + if (batchSize >= FLAGS_snapshot_batch_size) { + cb(std::move(data), totalCount, totalSize, false); + data.clear(); + batchSize = 0; + } + auto key = iter->key(); + auto val = iter->val(); + data.emplace_back(encodeKV(key, val)); + batchSize += data.back().size(); + totalSize += data.back().size(); + totalCount++; + iter->next(); + } + if (data.size() > 0) { + cb(std::move(data), totalCount, totalSize, true); + } +} +} // namespace kvstore +} // namespace nebula + + diff --git a/src/kvstore/SnapshotManagerImpl.h b/src/kvstore/SnapshotManagerImpl.h new file mode 100644 index 00000000000..177c1c70baa --- /dev/null +++ b/src/kvstore/SnapshotManagerImpl.h @@ -0,0 +1,33 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef KVSTORE_SNAPSHOTMANAGERIMPL_H_ +#define KVSTORE_SNAPSHOTMANAGERIMPL_H_ + +#include "base/Base.h" +#include "kvstore/raftex/SnapshotManager.h" +#include "kvstore/KVStore.h" + +namespace nebula { +namespace kvstore { + +class SnapshotManagerImpl : public raftex::SnapshotManager { +public: + explicit SnapshotManagerImpl(KVStore* kv) : store_(kv) {} + + void accessAllRowsInSnapshot(GraphSpaceID spaceId, + PartitionID partId, + raftex::SnapshotCallback cb) override; + +private: + KVStore* store_; +}; + +} // namespace kvstore +} // namespace nebula + +#endif // KVSTORE_SNAPSHOTMANAGERIMPL_H_ + diff --git a/src/kvstore/raftex/CMakeLists.txt b/src/kvstore/raftex/CMakeLists.txt index 7437ba000ad..5ecd5bfec11 100644 --- a/src/kvstore/raftex/CMakeLists.txt +++ b/src/kvstore/raftex/CMakeLists.txt @@ -4,6 +4,7 @@ nebula_add_library( RaftPart.cpp RaftexService.cpp Host.cpp + SnapshotManager.cpp ) add_subdirectory(test) diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 249c85021d6..98857a80a3d 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -106,37 +106,22 @@ folly::Future Host::appendLogs( std::lock_guard g(lock_); auto res = checkStatus(); - - if (logId == logIdToSend_) { - // This is a re-send or a heartbeat. If there is an - // ongoing request, we will just return SUCCEEDED - if (requestOnGoing_) { - LOG(INFO) << idStr_ << "Another request is onging," - "ignore the re-send request"; - cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::SUCCEEDED); - return r; - } - } else { - // Otherwise, logId has to be greater - if (logId < logIdToSend_) { - LOG(INFO) << idStr_ << "The log has been sended"; - cpp2::AppendLogResponse r; - r.set_error_code(cpp2::ErrorCode::SUCCEEDED); - return r; - } + if (logId <= lastLogIdSent_) { + LOG(INFO) << idStr_ << "The log " << logId << " has been sended" + << ", lastLogIdSent " << lastLogIdSent_; + cpp2::AppendLogResponse r; + r.set_error_code(cpp2::ErrorCode::SUCCEEDED); + return r; } if (requestOnGoing_ && res == cpp2::ErrorCode::SUCCEEDED) { if (cachingPromise_.size() <= FLAGS_max_outstanding_requests) { pendingReq_ = std::make_tuple(term, logId, - committedLogId, - prevLogTerm, - prevLogId); + committedLogId); return cachingPromise_.getFuture(); } else { - LOG(INFO) << idStr_ + PLOG_EVERY_N(INFO, 200) << idStr_ << "Too many requests are waiting, return error"; cpp2::AppendLogResponse r; r.set_error_code(cpp2::ErrorCode::E_TOO_MANY_REQUESTS); @@ -155,14 +140,20 @@ folly::Future Host::appendLogs( VLOG(2) << idStr_ << "About to send the AppendLog request"; // No request is ongoing, let's send a new request - CHECK_GE(prevLogTerm, lastLogTermSent_); - CHECK_GE(prevLogId, lastLogIdSent_); + if (UNLIKELY(lastLogIdSent_ == 0 && lastLogTermSent_ == 0)) { + LOG(INFO) << idStr_ << "This is the first time to send the logs to this host"; + lastLogIdSent_ = prevLogId; + lastLogTermSent_ = prevLogTerm; + } + if (prevLogTerm < lastLogTermSent_ || prevLogId < lastLogIdSent_) { + LOG(INFO) << idStr_ << "We have sended this log, so go on from id " << lastLogIdSent_ + << ", term " << lastLogTermSent_ << "; current prev log id " << prevLogId + << ", current prev log term " << prevLogTerm; + } logTermToSend_ = term; logIdToSend_ = logId; - lastLogTermSent_ = prevLogTerm; - lastLogIdSent_ = prevLogId; committedLogId_ = committedLogId; - pendingReq_ = std::make_tuple(0, 0, 0, 0, 0); + pendingReq_ = std::make_tuple(0, 0, 0); promise_ = std::move(cachingPromise_); cachingPromise_ = folly::SharedPromise(); ret = promise_.getFuture(); @@ -183,7 +174,7 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) { promise_.setValue(r); cachingPromise_.setValue(r); cachingPromise_ = folly::SharedPromise(); - pendingReq_ = std::make_tuple(0, 0, 0, 0, 0); + pendingReq_ = std::make_tuple(0, 0, 0); requestOnGoing_ = false; } @@ -193,7 +184,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, [eb, self = shared_from_this()] (folly::Try&& t) { VLOG(3) << self->idStr_ << "appendLogs() call got response"; if (t.hasException()) { - LOG(ERROR) << self->idStr_ << t.exception().what(); + VLOG(2) << self->idStr_ << t.exception().what(); cpp2::AppendLogResponse r; r.set_error_code(cpp2::ErrorCode::E_EXCEPTION); { @@ -201,7 +192,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, self->setResponse(r); } self->noMoreRequestCV_.notify_all(); - return r; + return; } cpp2::AppendLogResponse resp = std::move(t).value(); @@ -217,7 +208,6 @@ void Host::appendLogsInternal(folly::EventBase* eb, << "AppendLog request sent successfully"; std::shared_ptr newReq; - cpp2::AppendLogResponse r; { std::lock_guard g(self->lock_); auto res = self->checkStatus(); @@ -225,6 +215,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, VLOG(2) << self->idStr_ << "The host is not in a proper status," " just return"; + cpp2::AppendLogResponse r; r.set_error_code(res); self->setResponse(r); } else { @@ -257,11 +248,9 @@ void Host::appendLogsInternal(folly::EventBase* eb, self->promise_ = std::move(self->cachingPromise_); self->cachingPromise_ = folly::SharedPromise(); - self->pendingReq_ = std::make_tuple(0, 0, 0, 0, 0); + self->pendingReq_ = std::make_tuple(0, 0, 0); } } - - r = std::move(resp); } } @@ -270,13 +259,12 @@ void Host::appendLogsInternal(folly::EventBase* eb, } else { self->noMoreRequestCV_.notify_all(); } - return r; + return; } case cpp2::ErrorCode::E_LOG_GAP: { VLOG(2) << self->idStr_ << "The host's log is behind, need to catch up"; std::shared_ptr newReq; - cpp2::AppendLogResponse r; { std::lock_guard g(self->lock_); auto res = self->checkStatus(); @@ -284,13 +272,13 @@ void Host::appendLogsInternal(folly::EventBase* eb, VLOG(2) << self->idStr_ << "The host is not in a proper status," " skip catching up the gap"; + cpp2::AppendLogResponse r; r.set_error_code(res); self->setResponse(r); } else { self->lastLogIdSent_ = resp.get_last_log_id(); self->lastLogTermSent_ = resp.get_last_log_term(); newReq = self->prepareAppendLogRequest(); - r = std::move(resp); } } if (newReq) { @@ -298,7 +286,47 @@ void Host::appendLogsInternal(folly::EventBase* eb, } else { self->noMoreRequestCV_.notify_all(); } - return r; + return; + } + case cpp2::ErrorCode::E_WAITING_SNAPSHOT: { + VLOG(2) << self->idStr_ + << "The host is waiting for the snapshot, so we need to send log from " + << " current committedLogId " << self->committedLogId_; + std::shared_ptr newReq; + { + std::lock_guard g(self->lock_); + auto res = self->checkStatus(); + if (res != cpp2::ErrorCode::SUCCEEDED) { + VLOG(2) << self->idStr_ + << "The host is not in a proper status," + " skip waiting the snapshot"; + cpp2::AppendLogResponse r; + r.set_error_code(res); + self->setResponse(r); + } else { + self->lastLogIdSent_ = self->committedLogId_; + self->lastLogTermSent_ = self->logTermToSend_; + newReq = self->prepareAppendLogRequest(); + } + } + if (newReq) { + self->appendLogsInternal(eb, newReq); + } else { + self->noMoreRequestCV_.notify_all(); + } + return; + } + case cpp2::ErrorCode::E_LOG_STALE: { + VLOG(2) << self->idStr_ << "Log stale, reset lastLogIdSent " << self->lastLogIdSent_ + << " to the followers lastLodId " << resp.get_last_log_id(); + { + std::lock_guard g(self->lock_); + self->lastLogIdSent_ = resp.get_last_log_id(); + self->lastLogTermSent_ = resp.get_last_log_term(); + self->setResponse(resp); + } + self->noMoreRequestCV_.notify_all(); + return; } default: { PLOG_EVERY_N(ERROR, 100) @@ -311,7 +339,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, self->setResponse(resp); } self->noMoreRequestCV_.notify_all(); - return resp; + return; } } }); @@ -319,7 +347,7 @@ void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr -Host::prepareAppendLogRequest() const { +Host::prepareAppendLogRequest() { CHECK(!lock_.try_lock()); auto req = std::make_shared(); req->set_space(part_->spaceId()); @@ -353,8 +381,25 @@ Host::prepareAppendLogRequest() const { logs.emplace_back(std::move(le)); } req->set_log_str_list(std::move(logs)); + req->set_sending_snapshot(false); } else { - LOG(FATAL) << idStr_ << "We have not support snapshot yet"; + req->set_sending_snapshot(true); + if (!sendingSnapshot_) { + LOG(INFO) << idStr_ << "Can't find log " << lastLogIdSent_ + 1 + << " in wal, send the snapshot"; + sendingSnapshot_ = true; + part_->snapshot_->sendSnapshot(part_, addr_).then([this] (Status&& status) { + if (status.ok()) { + LOG(INFO) << idStr_ << "Send snapshot succeeded!"; + } else { + LOG(INFO) << idStr_ << "Send snapshot failed!"; + // TODO(heng): we should tell the follower i am failed. + } + sendingSnapshot_ = false; + }); + } else { + LOG(INFO) << idStr_ << "The snapshot req is in queue, please wait for a moment"; + } } return req; @@ -381,7 +426,7 @@ folly::Future Host::sendAppendLogRequest( VLOG(1) << idStr_ << "Sending request space " << req->get_space() << ", part " << req->get_part() << ", current term " << req->get_current_term() - << ", last_log_id" << req->get_last_log_id() + << ", last_log_id " << req->get_last_log_id() << ", committed_id " << req->get_committed_log_id() << ", last_log_term_sent" << req->get_last_log_term_sent() << ", last_log_id_sent " << req->get_last_log_id_sent(); @@ -392,7 +437,7 @@ folly::Future Host::sendAppendLogRequest( bool Host::noRequest() const { CHECK(!lock_.try_lock()); - static auto emptyTup = std::make_tuple(0, 0, 0, 0, 0); + static auto emptyTup = std::make_tuple(0, 0, 0); return pendingReq_ == emptyTup; } diff --git a/src/kvstore/raftex/Host.h b/src/kvstore/raftex/Host.h index 5c77031f284..7a76867db42 100644 --- a/src/kvstore/raftex/Host.h +++ b/src/kvstore/raftex/Host.h @@ -85,7 +85,7 @@ class Host final : public std::enable_shared_from_this { folly::EventBase* eb, std::shared_ptr req); - std::shared_ptr prepareAppendLogRequest() const; + std::shared_ptr prepareAppendLogRequest(); bool noRequest() const; @@ -97,8 +97,8 @@ class Host final : public std::enable_shared_from_this { } private: - // - using Request = std::tuple; + // + using Request = std::tuple; std::shared_ptr part_; const HostAddr addr_; @@ -115,7 +115,7 @@ class Host final : public std::enable_shared_from_this { folly::SharedPromise promise_; folly::SharedPromise cachingPromise_; - Request pendingReq_{0, 0, 0, 0, 0}; + Request pendingReq_{0, 0, 0}; // These logId and term pointing to the latest log we need to send LogID logIdToSend_{0}; @@ -126,6 +126,7 @@ class Host final : public std::enable_shared_from_this { TermID lastLogTermSent_{0}; LogID committedLogId_{0}; + std::atomic_bool sendingSnapshot_{false}; }; } // namespace raftex diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index e2d3e88ce43..3a03d04b684 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -23,6 +23,9 @@ DEFINE_bool(accept_log_append_during_pulling, false, "Whether to accept new logs during pulling the snapshot"); DEFINE_uint32(raft_heartbeat_interval_secs, 5, "Seconds between each heartbeat"); + +DEFINE_uint64(raft_snapshot_timeout, 60 * 5, "Max seconds between two snapshot requests"); + DEFINE_uint32(max_batch_size, 256, "The max number of logs in a batch"); DEFINE_int32(wal_ttl, 86400, "Default wal ttl"); @@ -198,7 +201,8 @@ RaftPart::RaftPart(ClusterID clusterId, const folly::StringPiece walRoot, std::shared_ptr pool, std::shared_ptr workers, - std::shared_ptr executor) + std::shared_ptr executor, + std::shared_ptr snapshotMan) : idStr_{folly::stringPrintf("[Port: %d, Space: %d, Part: %d] ", localAddr.second, spaceId, partId)} , clusterId_{clusterId} @@ -210,13 +214,15 @@ RaftPart::RaftPart(ClusterID clusterId, , leader_{0, 0} , ioThreadPool_{pool} , bgWorkers_{workers} - , executor_(executor) { + , executor_(executor) + , snapshot_(snapshotMan) { FileBasedWalPolicy policy; policy.ttl = FLAGS_wal_ttl; policy.fileSize = FLAGS_wal_file_size; policy.bufferSize = FLAGS_wal_buffer_size; policy.numBuffers = FLAGS_wal_buffer_num; wal_ = FileBasedWal::getWal(walRoot, + idStr_, policy, [this] (LogID logId, TermID logTermId, @@ -832,6 +838,9 @@ bool RaftPart::needToStartElection() { role_ == Role::FOLLOWER && (lastMsgRecvDur_.elapsedInSec() >= FLAGS_raft_heartbeat_interval_secs || term_ == 0)) { + LOG(INFO) << idStr_ << "Start leader election, reason: lastMsgDur " + << lastMsgRecvDur_.elapsedInSec() + << ", term " << term_; role_ = Role::CANDIDATE; LOG(INFO) << idStr_ << "needToStartElection: lastMsgRecvDur " << lastMsgRecvDur_.elapsedInSec() @@ -1029,10 +1038,15 @@ void RaftPart::statusPolling() { VLOG(2) << idStr_ << "Need to send heartbeat"; sendHeartbeat(); } - wal_->cleanWAL(); + if (needToCleanupSnapshot()) { + LOG(INFO) << idStr_ << "Clean up the snapshot"; + cleanupSnapshot(); + } + wal_->cleanWAL(FLAGS_wal_ttl); { std::lock_guard g(raftLock_); - if (status_ == Status::RUNNING) { + if (status_ == Status::RUNNING || status_ == Status::WAITING_SNAPSHOT) { + VLOG(3) << idStr_ << "Schedule new task"; bgWorkers_->addDelayTask( delay, [self = shared_from_this()] { @@ -1042,6 +1056,19 @@ void RaftPart::statusPolling() { } } +bool RaftPart::needToCleanupSnapshot() { + std::lock_guard g(raftLock_); + return status_ == Status::WAITING_SNAPSHOT && + role_ != Role::LEADER && + lastSnapshotRecvDur_.elapsedInSec() >= FLAGS_raft_snapshot_timeout; +} + +void RaftPart::cleanupSnapshot() { + LOG(INFO) << idStr_ << "Clean up the snapshot"; + std::lock_guard g(raftLock_); + reset(); + status_ = Status::RUNNING; +} void RaftPart::processAskForVoteRequest( const cpp2::AskForVoteRequest& req, @@ -1122,8 +1149,13 @@ void RaftPart::processAskForVoteRequest( // If the partition used to be a leader, need to fire the callback if (oldRole == Role::LEADER) { - // Need to invoke the onLostLeadership callback LOG(INFO) << idStr_ << "Was a leader, need to do some clean-up"; + if (wal_->lastLogId() > lastLogId_) { + LOG(INFO) << idStr_ << "There is one log " << wal_->lastLogId() + << " i did not commit when i was leader, rollback to " << lastLogId_; + wal_->rollbackToLog(lastLogId_); + } + // Need to invoke the onLostLeadership callback bgWorkers_->addTask( [self = shared_from_this(), oldTerm] { self->onLostLeadership(oldTerm); @@ -1142,8 +1174,6 @@ void RaftPart::processAskForVoteRequest( void RaftPart::processAppendLogRequest( const cpp2::AppendLogRequest& req, cpp2::AppendLogResponse& resp) { - bool hasSnapshot = req.get_snapshot_uri() != nullptr; - VLOG(2) << idStr_ << "Received logAppend " << ": GraphSpaceId = " << req.get_space() @@ -1159,9 +1189,9 @@ void RaftPart::processAppendLogRequest( ", num_logs = %ld, logTerm = %ld", req.get_log_str_list().size(), req.get_log_term()) - << (hasSnapshot - ? ", SnapshotURI = " + *(req.get_snapshot_uri()) - : ""); + << ", sendingSnapshot = " << req.get_sending_snapshot() + << ", local lastLogId = " << lastLogId_ + << ", local committedLogId = " << committedLogId_; std::lock_guard g(raftLock_); @@ -1171,7 +1201,6 @@ void RaftPart::processAppendLogRequest( resp.set_committed_log_id(committedLogId_); resp.set_last_log_id(lastLogId_); resp.set_last_log_term(lastLogTerm_); - resp.set_pulling_snapshot(false); // Check status if (UNLIKELY(status_ == Status::STOPPED)) { @@ -1197,32 +1226,61 @@ void RaftPart::processAppendLogRequest( // Reset the timeout timer lastMsgRecvDur_.reset(); - // TODO Check snapshot pulling status -// if (hasSnapshot && !isPullingSnapshot()) { -// // We need to pull the snapshot -// startSnapshotPullingThread(std::move(req.get_snapshot_uri())); -// } -// if (isPullingSnapshot()) { -// CHECK_NE(oldRole, Role::LEADER); -// resp.set_pulling_snapshot(true); -// if (!FLAGS_accept_log_append_during_pulling) { -// VLOG(2) << idStr_ -// << "Pulling the snapshot and not allowed to accept" -// " the LogAppend Requests"; -// resp.set_error_code(cpp2::ErrorCode::E_PULLING_SNAPSHOT); -// return; -// } -// } + if (req.get_sending_snapshot() && status_ != Status::WAITING_SNAPSHOT) { + LOG(INFO) << idStr_ << "Begin to wait for the snapshot"; + reset(); + status_ = Status::WAITING_SNAPSHOT; + resp.set_error_code(cpp2::ErrorCode::E_WAITING_SNAPSHOT); + return; + } + + if (UNLIKELY(status_ == Status::WAITING_SNAPSHOT)) { + VLOG(2) << idStr_ + << "The part is receiving snapshot," + << "so just accept the new wals, but don't commit them." + << "last_log_id_sent " << req.get_last_log_id_sent() + << ", total log number " << req.get_log_str_list().size(); + if (lastLogId_ > 0 && req.get_last_log_id_sent() > lastLogId_) { + // There is a gap + LOG(INFO) << idStr_ << "Local is missing logs from id " + << lastLogId_ << ". Need to catch up"; + resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); + return; + } + // TODO(heng): if we have 3 node, one is leader, one is wait snapshot and return success, + // the other is follower, but leader replica log to follow failed, + // How to deal with leader crash? At this time, no leader will be elected. + size_t numLogs = req.get_log_str_list().size(); + LogID firstId = req.get_last_log_id_sent() + 1; + + VLOG(2) << idStr_ << "Writing log [" << firstId + << ", " << firstId + numLogs - 1 << "] to WAL"; + LogStrListIterator iter(firstId, + req.get_log_term(), + req.get_log_str_list()); + if (wal_->appendLogs(iter)) { + CHECK_EQ(firstId + numLogs - 1, wal_->lastLogId()); + lastLogId_ = wal_->lastLogId(); + lastLogTerm_ = wal_->lastLogTerm(); + resp.set_last_log_id(lastLogId_); + resp.set_last_log_term(lastLogTerm_); + resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); + } else { + LOG(ERROR) << idStr_ << "Failed to append logs to WAL"; + resp.set_error_code(cpp2::ErrorCode::E_WAL_FAIL); + } + return; + } if (req.get_last_log_id_sent() < committedLogId_) { - LOG(INFO) << idStr_ << "The log " << req.get_last_log_id_sent() + LOG(INFO) << idStr_ << "Stale log! The log " << req.get_last_log_id_sent() << " i had committed yet. My committedLogId is " << committedLogId_; resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE); return; } if (lastLogTerm_ > 0 && req.get_last_log_term_sent() != lastLogTerm_) { - VLOG(2) << idStr_ << "The local last log term is " << lastLogTerm_ + LOG(INFO) << idStr_ << "The local last log term is " << lastLogTerm_ << ", which is different from the leader's prevLogTerm " << req.get_last_log_term_sent() << ". So need to rollback to last committedLogId_ " << committedLogId_; @@ -1236,17 +1294,17 @@ void RaftPart::processAppendLogRequest( return; } else if (req.get_last_log_id_sent() > lastLogId_) { // There is a gap - VLOG(2) << idStr_ << "Local is missing logs from id " + LOG(INFO) << idStr_ << "Local is missing logs from id " << lastLogId_ << ". Need to catch up"; resp.set_error_code(cpp2::ErrorCode::E_LOG_GAP); return; } else if (req.get_last_log_id_sent() < lastLogId_) { - // Local has some extra logs, which need to be rolled back - wal_->rollbackToLog(req.get_last_log_id_sent()); - lastLogId_ = wal_->lastLogId(); - lastLogTerm_ = wal_->lastLogTerm(); - resp.set_last_log_id(lastLogId_); - resp.set_last_log_term(lastLogTerm_); + LOG(INFO) << idStr_ << "Stale log! Local lastLogId " << lastLogId_ + << ", lastLogTerm " << lastLogTerm_ + << ", lastLogIdSent " << req.get_last_log_id_sent() + << ", lastLogTermSent " << req.get_last_log_term_sent(); + resp.set_error_code(cpp2::ErrorCode::E_LOG_STALE); + return; } // Append new logs @@ -1276,7 +1334,7 @@ void RaftPart::processAppendLogRequest( LogID lastLogIdCanCommit = std::min(lastLogId_, req.get_committed_log_id()); CHECK(committedLogId_ + 1 <= lastLogIdCanCommit); if (commitLogs(wal_->iterator(committedLogId_ + 1, lastLogIdCanCommit))) { - VLOG(2) << idStr_ << "Follower succeeded committing log " + VLOG(1) << idStr_ << "Follower succeeded committing log " << committedLogId_ + 1 << " to " << lastLogIdCanCommit; committedLogId_ = lastLogIdCanCommit; @@ -1355,8 +1413,13 @@ cpp2::ErrorCode RaftPart::verifyLeader( req.get_leader_port()); term_ = proposedTerm_ = req.get_current_term(); if (oldRole == Role::LEADER) { - // Need to invoke onLostLeadership callback VLOG(2) << idStr_ << "Was a leader, need to do some clean-up"; + if (wal_->lastLogId() > lastLogId_) { + LOG(INFO) << idStr_ << "There is one log " << wal_->lastLogId() + << " i did not commit when i was leader, rollback to " << lastLogId_; + wal_->rollbackToLog(lastLogId_); + } + // Need to invoke onLostLeadership callback bgWorkers_->addTask([self = shared_from_this(), oldTerm] { self->onLostLeadership(oldTerm); }); @@ -1368,6 +1431,68 @@ cpp2::ErrorCode RaftPart::verifyLeader( return cpp2::ErrorCode::SUCCEEDED; } +void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req, + cpp2::SendSnapshotResponse& resp) { + VLOG(1) << idStr_ << "Receive snapshot, total rows " << req.get_rows().size() + << ", total count received " << req.get_total_count() + << ", total size received " << req.get_total_size() + << ", finished " << req.get_done(); + std::lock_guard g(raftLock_); + // Check status + if (UNLIKELY(status_ == Status::STOPPED)) { + LOG(ERROR) << idStr_ + << "The part has been stopped, skip the request"; + resp.set_error_code(cpp2::ErrorCode::E_BAD_STATE); + return; + } + if (UNLIKELY(status_ == Status::STARTING)) { + LOG(ERROR) << idStr_ << "The partition is still starting"; + resp.set_error_code(cpp2::ErrorCode::E_NOT_READY); + return; + } + if (UNLIKELY(role_ != Role::FOLLOWER && role_ != Role::LEARNER)) { + LOG(ERROR) << idStr_ << "Bad role " << roleStr(role_); + resp.set_error_code(cpp2::ErrorCode::E_BAD_STATE); + return; + } + if (UNLIKELY(leader_ != HostAddr(req.get_leader_ip(), req.get_leader_port()) + || term_ != req.get_term())) { + LOG(ERROR) << idStr_ << "Term out of date, current term " << term_ + << ", received term " << req.get_term(); + resp.set_error_code(cpp2::ErrorCode::E_TERM_OUT_OF_DATE); + return; + } + if (status_ != Status::WAITING_SNAPSHOT) { + LOG(INFO) << idStr_ << "Begin to receive the snapshot"; + reset(); + status_ = Status::WAITING_SNAPSHOT; + } + lastSnapshotRecvDur_.reset(); + // TODO(heng): Maybe we should save them into one sst firstly? + auto ret = commitSnapshot(req.get_rows(), + req.get_committed_log_id(), + req.get_committed_log_term(), + req.get_done()); + lastTotalCount_ += ret.first; + lastTotalSize_ += ret.second; + if (lastTotalCount_ != req.get_total_count() + || lastTotalSize_ != req.get_total_size()) { + LOG(ERROR) << idStr_ << "Bad snapshot, total rows received " << lastTotalCount_ + << ", total rows sended " << req.get_total_count() + << ", total size received " << lastTotalSize_ + << ", total size sended " << req.get_total_size(); + resp.set_error_code(cpp2::ErrorCode::E_PERSIST_SNAPSHOT_FAILED); + return; + } + if (req.get_done()) { + committedLogId_ = req.get_committed_log_id(); + status_ = Status::RUNNING; + LOG(INFO) << idStr_ << "Receive all snapshot, committedLogId_ = " << committedLogId_ + << ", lastLodId " << lastLogId_; + } + resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); + return; +} folly::Future RaftPart::sendHeartbeat() { VLOG(2) << idStr_ << "Send heartbeat"; @@ -1402,6 +1527,14 @@ bool RaftPart::checkAppendLogResult(AppendLogResult res) { return true; } +void RaftPart::reset() { + CHECK(!raftLock_.try_lock()); + wal_->reset(); + cleanup(); + lastLogId_ = committedLogId_ = 0; + lastTotalCount_ = 0; + lastTotalSize_ = 0; +} } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 59faf970acf..99d5c01f696 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -14,6 +14,7 @@ #include "time/Duration.h" #include "thread/GenericThreadPool.h" #include "base/LogIterator.h" +#include "kvstore/raftex/SnapshotManager.h" namespace folly { class IOThreadPoolExecutor; @@ -67,6 +68,7 @@ using AtomicOp = folly::Function; class RaftPart : public std::enable_shared_from_this { friend class AppendLogsIterator; friend class Host; + friend class SnapshotManager; public: virtual ~RaftPart(); @@ -178,6 +180,10 @@ class RaftPart : public std::enable_shared_from_this { const cpp2::AppendLogRequest& req, cpp2::AppendLogResponse& resp); + // Process sendSnapshot request + void processSendSnapshotRequest( + const cpp2::SendSnapshotRequest& req, + cpp2::SendSnapshotResponse& resp); protected: // Protected constructor to prevent from instantiating directly @@ -188,7 +194,8 @@ class RaftPart : public std::enable_shared_from_this { const folly::StringPiece walRoot, std::shared_ptr pool, std::shared_ptr workers, - std::shared_ptr executor); + std::shared_ptr executor, + std::shared_ptr snapshotMan); const char* idStr() const { return idStr_.c_str(); @@ -220,11 +227,24 @@ class RaftPart : public std::enable_shared_from_this { ClusterID clusterId, const std::string& log) = 0; + // Return committed; + virtual std::pair commitSnapshot(const std::vector& data, + LogID committedLogId, + TermID committedLogTerm, + bool finished) = 0; + + // Clean up all data about current part in storage. + virtual void cleanup() = 0; + + // Reset the part, clean up all data and WALs. + void reset(); + private: enum class Status { STARTING = 0, // The part is starting, not ready for service RUNNING, // The part is running - STOPPED // The part has been stopped + STOPPED, // The part has been stopped + WAITING_SNAPSHOT // Waiting for the snapshot. }; enum class Role { @@ -279,6 +299,10 @@ class RaftPart : public std::enable_shared_from_this { void statusPolling(); + bool needToCleanupSnapshot(); + + void cleanupSnapshot(); + // The method sends out AskForVote request // It return true if a leader is elected, otherwise returns false bool leaderElection(); @@ -465,6 +489,13 @@ class RaftPart : public std::enable_shared_from_this { std::shared_ptr bgWorkers_; // Workers pool std::shared_ptr executor_; + + std::shared_ptr snapshot_; + + // Used in snapshot, record the last total count and total size received from request + int64_t lastTotalCount_ = 0; + int64_t lastTotalSize_ = 0; + time::Duration lastSnapshotRecvDur_; }; } // namespace raftex diff --git a/src/kvstore/raftex/RaftexService.cpp b/src/kvstore/raftex/RaftexService.cpp index cb2ebdd1730..a7d67f62e45 100644 --- a/src/kvstore/raftex/RaftexService.cpp +++ b/src/kvstore/raftex/RaftexService.cpp @@ -65,6 +65,7 @@ void RaftexService::initThriftServer(std::shared_ptrsetPort(port); server_->setIdleTimeout(std::chrono::seconds(0)); + server_->setReusePort(true); if (pool != nullptr) { server_->setIOThreadPool(pool); } @@ -213,6 +214,18 @@ void RaftexService::appendLog( part->processAppendLogRequest(req, resp); } +void RaftexService::sendSnapshot( + cpp2::SendSnapshotResponse& resp, + const cpp2::SendSnapshotRequest& req) { + auto part = findPart(req.get_space(), req.get_part()); + if (!part) { + // Not found + resp.set_error_code(cpp2::ErrorCode::E_UNKNOWN_PART); + return; + } + + part->processSendSnapshotRequest(req, resp); +} } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/RaftexService.h b/src/kvstore/raftex/RaftexService.h index 684ed2357c1..ee4466048c2 100644 --- a/src/kvstore/raftex/RaftexService.h +++ b/src/kvstore/raftex/RaftexService.h @@ -46,9 +46,16 @@ class RaftexService : public cpp2::RaftexServiceSvIf { void appendLog(cpp2::AppendLogResponse& resp, const cpp2::AppendLogRequest& req) override; + void sendSnapshot( + cpp2::SendSnapshotResponse& resp, + const cpp2::SendSnapshotRequest& req) override; + void addPartition(std::shared_ptr part); void removePartition(std::shared_ptr part); + std::shared_ptr findPart(GraphSpaceID spaceId, + PartitionID partId); + private: void initThriftServer(std::shared_ptr pool, std::shared_ptr workers, @@ -61,9 +68,6 @@ class RaftexService : public cpp2::RaftexServiceSvIf { RaftexService() = default; - std::shared_ptr findPart(GraphSpaceID spaceId, - PartitionID partId); - private: std::unique_ptr server_; std::unique_ptr serverThread_; diff --git a/src/kvstore/raftex/SnapshotManager.cpp b/src/kvstore/raftex/SnapshotManager.cpp new file mode 100644 index 00000000000..61f8949b1e9 --- /dev/null +++ b/src/kvstore/raftex/SnapshotManager.cpp @@ -0,0 +1,118 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ +#include "kvstore/raftex/SnapshotManager.h" +#include "base/NebulaKeyUtils.h" +#include "kvstore/raftex/RaftPart.h" + +DEFINE_int32(snapshot_worker_threads, 4, "Threads number for snapshot"); +DEFINE_int32(snapshot_io_threads, 4, "Threads number for snapshot"); +DEFINE_int32(snapshot_send_retry_times, 3, "Retry times if send failed"); +DEFINE_int32(snapshot_send_timeout_ms, 60000, "Rpc timeout for sending snapshot"); + +namespace nebula { +namespace raftex { + +SnapshotManager::SnapshotManager() { + executor_.reset(new folly::IOThreadPoolExecutor(FLAGS_snapshot_worker_threads)); + ioThreadPool_.reset(new folly::IOThreadPoolExecutor(FLAGS_snapshot_io_threads)); +} + +folly::Future SnapshotManager::sendSnapshot(std::shared_ptr part, + const HostAddr& dst) { + folly::Promise p; + auto fut = p.getFuture(); + executor_->add([this, p = std::move(p), part, dst] () mutable { + auto spaceId = part->spaceId_; + auto partId = part->partId_; + auto termId = part->term_; + // TODO(heng): maybe the committedLogId is less than the real one in the snapshot. + // It will not loss the data, but maybe some record will be committed twice. + auto commitLogIdAndTerm = part->lastCommittedLogId(); + const auto& localhost = part->address(); + std::vector> results; + LOG(INFO) << part->idStr_ << "Begin to send the snapshot"; + accessAllRowsInSnapshot(spaceId, + partId, + [&, this, p = std::move(p)] ( + std::vector&& data, + int64_t totalCount, + int64_t totalSize, + bool finished) mutable { + int retry = FLAGS_snapshot_send_retry_times; + while (retry-- > 0) { + auto f = send(spaceId, + partId, + termId, + commitLogIdAndTerm.first, + commitLogIdAndTerm.second, + localhost, + std::move(data), + totalSize, + totalCount, + dst, + finished); + // TODO(heng): we send request one by one to avoid too large memory occupied. + try { + auto resp = std::move(f).get(); + if (resp.get_error_code() == cpp2::ErrorCode::SUCCEEDED) { + VLOG(1) << part->idStr_ << "has sended count " << totalCount; + if (finished) { + LOG(INFO) << part->idStr_ << "Finished, totalCount " << totalCount + << ", totalSize " << totalSize; + p.setValue(Status::OK()); + } + return; + } + } catch (const std::exception& e) { + LOG(ERROR) << part->idStr_ << "Send snapshot failed, exception " << e.what(); + p.setValue(Status::Error("Send snapshot failed!")); + return; + } + } + LOG(WARNING) << part->idStr_ << "Send snapshot failed!"; + p.setValue(Status::Error("Send snapshot failed")); + return; + }); + }); + return fut; +} + +folly::Future SnapshotManager::send( + GraphSpaceID spaceId, + PartitionID partId, + TermID termId, + LogID committedLogId, + TermID committedLogTerm, + const HostAddr& localhost, + std::vector&& data, + int64_t totalSize, + int64_t totalCount, + const HostAddr& addr, + bool finished) { + VLOG(2) << "Send snapshot request to " << addr; + raftex::cpp2::SendSnapshotRequest req; + req.set_space(spaceId); + req.set_part(partId); + req.set_term(termId); + req.set_committed_log_id(committedLogId); + req.set_committed_log_term(committedLogTerm); + req.set_leader_ip(localhost.first); + req.set_leader_port(localhost.second); + req.set_rows(std::move(data)); + req.set_total_size(totalSize); + req.set_total_count(totalCount); + req.set_done(finished); + auto* evb = ioThreadPool_->getEventBase(); + return folly::via(evb, [this, addr, evb, req = std::move(req)] () mutable { + auto client = connManager_.client(addr, evb, false, FLAGS_snapshot_send_timeout_ms); + return client->future_sendSnapshot(req); + }); +} + +} // namespace raftex +} // namespace nebula + + diff --git a/src/kvstore/raftex/SnapshotManager.h b/src/kvstore/raftex/SnapshotManager.h new file mode 100644 index 00000000000..8f52da7d59c --- /dev/null +++ b/src/kvstore/raftex/SnapshotManager.h @@ -0,0 +1,65 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef RAFTEX_SNAPSHOTMANAGER_H_ +#define RAFTEX_SNAPSHOTMANAGER_H_ + +#include "base/Base.h" +#include "base/StatusOr.h" +#include "gen-cpp2/raftex_types.h" +#include "gen-cpp2/RaftexServiceAsyncClient.h" +#include "thrift/ThriftClientManager.h" +#include +#include +#include + +namespace nebula { +namespace raftex { + +using SnapshotCallback = folly::Function&& rows, + int64_t totalCount, + int64_t totalSize, + bool finished)>; +class RaftPart; + +class SnapshotManager { +public: + SnapshotManager(); + virtual ~SnapshotManager() = default; + + // Send snapshot for spaceId, partId to host dst. + folly::Future sendSnapshot(std::shared_ptr part, + const HostAddr& dst); + +private: + folly::Future send( + GraphSpaceID spaceId, + PartitionID partId, + TermID termId, + LogID committedLogId, + TermID committedLogTerm, + const HostAddr& localhost, + std::vector&& data, + int64_t totalSize, + int64_t totalCount, + const HostAddr& addr, + bool finished); + + virtual void accessAllRowsInSnapshot(GraphSpaceID spaceId, + PartitionID partId, + SnapshotCallback cb) = 0; + +private: + std::unique_ptr executor_; + std::unique_ptr ioThreadPool_; + thrift::ThriftClientManager connManager_; +}; + +} // namespace raftex +} // namespace nebula + +#endif // RAFTEX_SNAPSHOTMANAGER_H_ + diff --git a/src/kvstore/raftex/test/CMakeLists.txt b/src/kvstore/raftex/test/CMakeLists.txt index 269e3447b16..d8e2f94c3d2 100644 --- a/src/kvstore/raftex/test/CMakeLists.txt +++ b/src/kvstore/raftex/test/CMakeLists.txt @@ -64,3 +64,9 @@ nebula_add_test( LIBRARIES ${THRIFT_LIBRARIES} wangle gtest ) +nebula_add_test( + NAME snapshot_test + SOURCES SnapshotTest.cpp RaftexTestBase.cpp TestShard.cpp + OBJECTS ${RAFTEX_TEST_LIBS} + LIBRARIES ${THRIFT_LIBRARIES} wangle gtest +) diff --git a/src/kvstore/raftex/test/LeaderElectionTest.cpp b/src/kvstore/raftex/test/LeaderElectionTest.cpp index 870f94a093d..5a6184dc525 100644 --- a/src/kvstore/raftex/test/LeaderElectionTest.cpp +++ b/src/kvstore/raftex/test/LeaderElectionTest.cpp @@ -97,6 +97,7 @@ TEST(LeaderElection, LeaderCrash) { services[idx]->getIOThreadPool(), workers, services[idx]->getThreadManager(), + nullptr, std::bind(&onLeadershipLost, std::ref(copies), std::ref(leader), diff --git a/src/kvstore/raftex/test/LeaderTransferTest.cpp b/src/kvstore/raftex/test/LeaderTransferTest.cpp index 0b966a0707f..b0e0d86dbfb 100644 --- a/src/kvstore/raftex/test/LeaderTransferTest.cpp +++ b/src/kvstore/raftex/test/LeaderTransferTest.cpp @@ -70,11 +70,13 @@ TEST(LeaderTransferTest, ChangeLeaderServalTimesTest) { auto nLeaderIndex = checkLeadership(copies, leader); int32_t times = 0; while (++times <= 10) { + auto leaderIndex = nLeaderIndex; nLeaderIndex = (nLeaderIndex + 1) % 3; LOG(INFO) << times << " ===== Let's transfer the leader to " << allHosts[nLeaderIndex]; - auto f = leader->sendCommandAsync(test::encodeTransferLeader(allHosts[nLeaderIndex])); - f.wait(); leader.reset(); + auto f = copies[leaderIndex]->sendCommandAsync( + test::encodeTransferLeader(allHosts[nLeaderIndex])); + f.wait(); waitUntilLeaderElected(copies, leader); checkLeadership(copies, nLeaderIndex, leader); } diff --git a/src/kvstore/raftex/test/LogCommandTest.cpp b/src/kvstore/raftex/test/LogCommandTest.cpp index 30bbf20b0af..ebf830e7dca 100644 --- a/src/kvstore/raftex/test/LogCommandTest.cpp +++ b/src/kvstore/raftex/test/LogCommandTest.cpp @@ -54,7 +54,7 @@ TEST_F(LogCommandTest, CommandInMiddle) { ASSERT_EQ(3, leader_->commitTimes_); // need to sleep a bit more - sleep(1); + sleep(FLAGS_raft_heartbeat_interval_secs + 1); checkConsensus(copies_, 0, 9, msgs); } diff --git a/src/kvstore/raftex/test/RaftexTestBase.cpp b/src/kvstore/raftex/test/RaftexTestBase.cpp index aae7077626c..92b84420d62 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.cpp +++ b/src/kvstore/raftex/test/RaftexTestBase.cpp @@ -179,6 +179,7 @@ void setupRaft( if (isLearner.empty()) { isLearner.resize(allHosts.size(), false); } + auto sps = snapshots(services); // Create one copy of the shard for each service for (size_t i = 0; i < services.size(); i++) { copies.emplace_back(std::make_shared( @@ -190,6 +191,7 @@ void setupRaft( services[i]->getIOThreadPool(), workers, services[i]->getThreadManager(), + sps[i], std::bind(&onLeadershipLost, std::ref(copies), std::ref(leader), @@ -334,6 +336,16 @@ void rebootOneCopy(std::vector>& services, LOG(INFO) << "copies " << index << " reboot"; } +std::vector> snapshots( + const std::vector>& services) { + std::vector> snapshots; + for (auto& service : services) { + std::shared_ptr snapshot(new test::SnapshotManagerImpl(service.get())); + snapshots.emplace_back(std::move(snapshot)); + } + return snapshots; +} + } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/test/RaftexTestBase.h b/src/kvstore/raftex/test/RaftexTestBase.h index 12c6c4a94b4..7e3ea74d89a 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.h +++ b/src/kvstore/raftex/test/RaftexTestBase.h @@ -14,7 +14,7 @@ #include "fs/FileUtils.h" #include "thread/GenericThreadPool.h" #include "network/NetworkUtils.h" - +#include "kvstore/raftex/SnapshotManager.h" namespace nebula { @@ -99,6 +99,9 @@ void rebootOneCopy(std::vector>& services, std::vector allHosts, size_t index); +std::vector> snapshots( + const std::vector>& services); + class RaftexTestFixture : public ::testing::Test { public: explicit RaftexTestFixture(const std::string& testName, int32_t size = 3) @@ -130,6 +133,7 @@ class RaftexTestFixture : public ::testing::Test { std::vector> services_; std::vector> copies_; std::shared_ptr leader_; + std::vector> snapshots_; }; } // namespace raftex diff --git a/src/kvstore/raftex/test/SnapshotTest.cpp b/src/kvstore/raftex/test/SnapshotTest.cpp new file mode 100644 index 00000000000..8d50c265c52 --- /dev/null +++ b/src/kvstore/raftex/test/SnapshotTest.cpp @@ -0,0 +1,104 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "fs/FileUtils.h" +#include "thread/GenericThreadPool.h" +#include "network/NetworkUtils.h" +#include "kvstore/raftex/RaftexService.h" +#include "kvstore/raftex/test/RaftexTestBase.h" +#include "kvstore/raftex/test/TestShard.h" + +DECLARE_uint32(raft_heartbeat_interval_secs); +DECLARE_int32(wal_ttl); +DECLARE_int64(wal_file_size); +DECLARE_int32(wal_buffer_size); +DECLARE_int32(wal_buffer_num); +DECLARE_int32(raft_rpc_timeout_ms); + +namespace nebula { +namespace raftex { + +TEST(SnapshotTest, LearnerCatchUpDataTest) { + fs::TempDir walRoot("/tmp/catch_up_data.XXXXXX"); + FLAGS_wal_file_size = 128; + FLAGS_wal_buffer_size = 64; + FLAGS_wal_buffer_num = 1000; + FLAGS_raft_rpc_timeout_ms = 2000; + std::shared_ptr workers; + std::vector wals; + std::vector allHosts; + std::vector> services; + std::vector> copies; + + std::shared_ptr leader; + std::vector isLearner = {false, false, false, true}; + setupRaft(4, walRoot, workers, wals, allHosts, services, copies, leader, isLearner); + + // Check all hosts agree on the same leader + checkLeadership(copies, leader); + + std::vector msgs; + for (int i = 0; i < 10; i++) { + appendLogs(i * 100, i * 100 + 99, leader, msgs, true); + } + // Sleep a while to make sure the last log has been committed on followers + sleep(FLAGS_raft_heartbeat_interval_secs); + + // Check every copy + for (int i = 0; i < 3; i++) { + ASSERT_EQ(1000, copies[i]->getNumLogs()); + } + + for (int i = 0; i < 1000; ++i) { + for (int j = 0; j < 3; j++) { + folly::StringPiece msg; + ASSERT_TRUE(copies[j]->getLogMsg(i, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + } + // wait for the wal to be cleaned + FLAGS_wal_ttl = 1; + sleep(FLAGS_wal_ttl + 3); + FLAGS_wal_ttl = 60; + LOG(INFO) << "Add learner, we need to catch up data!"; + auto f = leader->sendCommandAsync(test::encodeLearner(allHosts[3])); + f.wait(); + + LOG(INFO) << "Let's continue to write some logs"; + for (int i = 10; i < 20; i++) { + appendLogs(i * 100, i * 100 + 99, leader, msgs, true); + } + sleep(FLAGS_raft_heartbeat_interval_secs); + + auto& learner = copies[3]; + ASSERT_EQ(2000, learner->getNumLogs()); + for (int i = 0; i < 2000; ++i) { + folly::StringPiece msg; + ASSERT_TRUE(learner->getLogMsg(i, msg)); + ASSERT_EQ(msgs[i], msg.toString()); + } + + LOG(INFO) << "Finished UT"; + finishRaft(services, copies, workers, leader); +} + +} // namespace raftex +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + return RUN_ALL_TESTS(); +} + + diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index c5a8b20d476..4911e8ea393 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -53,6 +53,20 @@ HostAddr decodeTransferLeader(const folly::StringPiece& log) { return leader; } +std::string encodeSnapshotRow(LogID logId, const std::string& row) { + std::string rawData; + rawData.reserve(sizeof(LogID) + row.size()); + rawData.append(reinterpret_cast(&logId), sizeof(logId)); + rawData.append(row.data(), row.size()); + return rawData; +} + +std::pair decodeSnapshotRow(const std::string& rawData) { + LogID id = *reinterpret_cast(rawData.data()); + auto str = rawData.substr(sizeof(LogID)); + return std::make_pair(id, std::move(str)); +} + TestShard::TestShard(size_t idx, std::shared_ptr svc, PartitionID partId, @@ -61,6 +75,7 @@ TestShard::TestShard(size_t idx, std::shared_ptr ioPool, std::shared_ptr workers, std::shared_ptr handlersPool, + std::shared_ptr snapshotMan, std::function leadershipLostCB, std::function @@ -72,7 +87,8 @@ TestShard::TestShard(size_t idx, walRoot, ioPool, workers, - handlersPool) + handlersPool, + snapshotMan) , idx_(idx) , service_(svc) , leadershipLostCB_(leadershipLostCB) @@ -135,6 +151,34 @@ bool TestShard::commitLogs(std::unique_ptr iter) { return true; } +std::pair TestShard::commitSnapshot(const std::vector& data, + LogID committedLogId, + TermID committedLogTerm, + bool finished) { + folly::RWSpinLock::WriteHolder wh(&lock_); + int64_t count = 0; + int64_t size = 0; + for (auto& row : data) { + count++; + size += row.size(); + auto idData = decodeSnapshotRow(row); + VLOG(1) << idStr_ << "Commit row logId " << idData.first << ", log " << idData.second; + data_.emplace_back(idData.first, std::move(idData.second)); + } + if (finished) { + lastCommittedLogId_ = committedLogId; + LOG(INFO) << idStr_ << "Commit the snapshot committedLogId " << committedLogId + << ", term " << committedLogTerm; + } + return std::make_pair(count, size); +} + +void TestShard::cleanup() { + folly::RWSpinLock::WriteHolder wh(&lock_); + data_.clear(); + lastCommittedLogId_ = 0; +} + size_t TestShard::getNumLogs() const { return data_.size(); } diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index 09a4265ecbb..0e6f075c3b3 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -9,11 +9,12 @@ #include "base/Base.h" #include "kvstore/raftex/RaftPart.h" +#include "kvstore/raftex/RaftexService.h" namespace nebula { namespace raftex { -class RaftexService; +// class RaftexService; namespace test { @@ -32,7 +33,12 @@ std::string encodeTransferLeader(const HostAddr& addr); HostAddr decodeTransferLeader(const folly::StringPiece& log); +std::string encodeSnapshotRow(LogID logId, const std::string& row); + +std::pair decodeSnapshotRow(const std::string& rawData); + class TestShard : public RaftPart { + friend class SnapshotManagerImpl; public: TestShard( size_t idx, @@ -43,6 +49,7 @@ class TestShard : public RaftPart { std::shared_ptr ioPool, std::shared_ptr workers, std::shared_ptr handlersPool, + std::shared_ptr snapshotMan, std::function leadershipLostCB, std::function @@ -92,6 +99,13 @@ class TestShard : public RaftPart { return true; } + std::pair commitSnapshot(const std::vector& data, + LogID committedLogId, + TermID committedLogTerm, + bool finished) override; + + void cleanup() override; + size_t getNumLogs() const; bool getLogMsg(size_t index, folly::StringPiece& msg); @@ -114,6 +128,52 @@ class TestShard : public RaftPart { becomeLeaderCB_; }; +class SnapshotManagerImpl : public SnapshotManager { +public: + explicit SnapshotManagerImpl(RaftexService* service) + : service_(service) { + CHECK_NOTNULL(service); + } + + ~SnapshotManagerImpl() { + LOG(INFO) << "~SnapshotManagerImpl()"; + } + + void accessAllRowsInSnapshot(GraphSpaceID spaceId, + PartitionID partId, + SnapshotCallback cb) override { + auto part = std::dynamic_pointer_cast(service_->findPart(spaceId, partId)); + CHECK(!!part); + int64_t totalCount = 0; + int64_t totalSize = 0; + std::vector data; + folly::RWSpinLock::ReadHolder rh(&part->lock_); + for (auto& row : part->data_) { + if (data.size() > 100) { + LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size() + << ", total count sended " << totalCount + << ", total size sended " << totalSize + << ", finished false"; + cb(std::move(data), totalCount, totalSize, false); + data.clear(); + } + auto encoded = encodeSnapshotRow(row.first, row.second); + totalSize += encoded.size(); + totalCount++; + data.emplace_back(std::move(encoded)); + } + if (data.size() > 0) { + LOG(INFO) << part->idStr_ << "Send snapshot total rows " << data.size() + << ", total count sended " << totalCount + << ", total size sended " << totalSize + << ", finished true"; + cb(std::move(data), totalCount, totalSize, true); + } + } + + RaftexService* service_; +}; + } // namespace test } // namespace raftex } // namespace nebula diff --git a/src/kvstore/test/LogEncoderTest.cpp b/src/kvstore/test/LogEncoderTest.cpp index 4469e23e964..ff36f68ddd0 100644 --- a/src/kvstore/test/LogEncoderTest.cpp +++ b/src/kvstore/test/LogEncoderTest.cpp @@ -114,6 +114,13 @@ TEST(LogEncoderTest, MultiValuesTest) { } } +TEST(LogEncoderTest, KVTest) { + auto encoded = encodeKV("KV_key", "KV_val"); + auto decoded = decodeKV(encoded); + ASSERT_EQ("KV_key", decoded.first); + ASSERT_EQ("KV_val", decoded.second); +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/test/NebulaStoreTest.cpp b/src/kvstore/test/NebulaStoreTest.cpp index 1766bd18579..add8faa70a9 100644 --- a/src/kvstore/test/NebulaStoreTest.cpp +++ b/src/kvstore/test/NebulaStoreTest.cpp @@ -384,7 +384,7 @@ TEST(NebulaStoreTest, ThreeCopiesTest) { } } -TEST(NebulaStoreTest, DISABLED_TransLeaderTest) { +TEST(NebulaStoreTest, TransLeaderTest) { fs::TempDir rootPath("/tmp/trans_leader_test.XXXXXX"); auto initNebulaStore = [](const std::vector& peers, int32_t index, @@ -474,7 +474,11 @@ TEST(NebulaStoreTest, DISABLED_TransLeaderTest) { CHECK(ok(partRet)); auto part = value(partRet); part->asyncTransferLeader(targetAddr, [&] (kvstore::ResultCode code) { - EXPECT_EQ(ResultCode::SUCCEEDED, code); + if (code == ResultCode::ERR_LEADER_CHANGED) { + ASSERT_EQ(targetAddr, part->leader()); + } else { + ASSERT_EQ(ResultCode::SUCCEEDED, code); + } baton.post(); }); baton.wait(); @@ -495,8 +499,13 @@ TEST(NebulaStoreTest, DISABLED_TransLeaderTest) { auto ret = stores[0]->part(spaceId, partId); CHECK(ok(ret)); auto part = nebula::value(ret); + LOG(INFO) << "Transfer part " << partId << " leader to " << targetAddr; part->asyncTransferLeader(targetAddr, [&] (kvstore::ResultCode code) { - EXPECT_EQ(ResultCode::SUCCEEDED, code); + if (code == ResultCode::ERR_LEADER_CHANGED) { + ASSERT_EQ(targetAddr, part->leader()); + } else { + ASSERT_EQ(ResultCode::SUCCEEDED, code); + } baton.post(); }); baton.wait(); diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index 12ace04492a..45f31776fcc 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -23,17 +23,20 @@ using nebula::fs::FileUtils; // static std::shared_ptr FileBasedWal::getWal( const folly::StringPiece dir, + const std::string& idStr, FileBasedWalPolicy policy, PreProcessor preProcessor) { return std::shared_ptr( - new FileBasedWal(dir, std::move(policy), std::move(preProcessor))); + new FileBasedWal(dir, idStr, std::move(policy), std::move(preProcessor))); } FileBasedWal::FileBasedWal(const folly::StringPiece dir, + const std::string& idStr, FileBasedWalPolicy policy, PreProcessor preProcessor) : dir_(dir.toString()) + , idStr_(idStr) , policy_(std::move(policy)) , maxFileSize_(policy_.fileSize) , maxBufferSize_(policy_.bufferSize) @@ -66,10 +69,9 @@ FileBasedWal::FileBasedWal(const folly::StringPiece dir, FileBasedWal::~FileBasedWal() { // FileBasedWal inherits from std::enable_shared_from_this, so at this // moment, there should have no other thread holding this WAL object - // Close the last file closeCurrFile(); - LOG(INFO) << "~FileBasedWal, dir = " << dir_; + LOG(INFO) << idStr_ << "~FileBasedWal, dir = " << dir_; } @@ -295,7 +297,7 @@ void FileBasedWal::prepareNewFile(LogID startLogId) { FileUtils::joinPath(dir_, folly::stringPrintf("%019ld.wal", startLogId)), startLogId); - VLOG(1) << "Write new file " << info->path(); + VLOG(1) << idStr_ << "Write new file " << info->path(); walFiles_.emplace(std::make_pair(startLogId, info)); // Create the file for write @@ -374,7 +376,6 @@ BufferPtr FileBasedWal::getLastBuffer(LogID id, size_t expectedToWrite) { if (buffers_.back()->size() + expectedToWrite <= maxBufferSize_) { return buffers_.back(); } - // Need to rollover to a new buffer if (buffers_.size() == policy_.numBuffers) { // Need to pop the first one @@ -392,19 +393,19 @@ bool FileBasedWal::appendLogInternal(LogID id, ClusterID cluster, std::string msg) { if (stopped_) { - LOG(ERROR) << "WAL has stopped. Do not accept logs any more"; + LOG(ERROR) << idStr_ << "WAL has stopped. Do not accept logs any more"; return false; } if (lastLogId_ != 0 && firstLogId_ != 0 && id != lastLogId_ + 1) { - LOG(ERROR) << "There is a gap in the log id. The last log id is " + LOG(ERROR) << idStr_ << "There is a gap in the log id. The last log id is " << lastLogId_ << ", and the id being appended is " << id; return false; } if (!preProcessor_(id, term, cluster, msg)) { - LOG(ERROR) << "Pre process failed for log " << id; + LOG(ERROR) << idStr_ << "Pre process failed for log " << id; return false; } @@ -473,7 +474,7 @@ bool FileBasedWal::appendLogs(LogIterator& iter) { iter.logTerm(), iter.logSource(), iter.logMsg().toString())) { - LOG(ERROR) << "Failed to append log for logId " + LOG(ERROR) << idStr_ << "Failed to append log for logId " << iter.logId(); return false; } @@ -491,7 +492,7 @@ std::unique_ptr FileBasedWal::iterator(LogID firstLogId, bool FileBasedWal::rollbackToLog(LogID id) { if (id < firstLogId_ - 1 || id > lastLogId_) { - LOG(ERROR) << "Rollback target id " << id + LOG(ERROR) << idStr_ << "Rollback target id " << id << " is not in the range of [" << firstLogId_ << "," << lastLogId_ << "] of WAL"; @@ -587,8 +588,7 @@ bool FileBasedWal::reset() { return true; } - -void FileBasedWal::cleanWAL() { +void FileBasedWal::cleanWAL(int32_t ttl) { std::lock_guard g(walFilesMutex_); if (walFiles_.empty()) { return; @@ -598,15 +598,21 @@ void FileBasedWal::cleanWAL() { size_t index = 0; auto it = walFiles_.begin(); auto size = walFiles_.size(); + int count = 0; + int walTTL = ttl == 0 ? policy_.ttl : ttl; while (it != walFiles_.end()) { - if (index++ < size - 1 && (now - it->second->mtime() > policy_.ttl)) { + if (index++ < size - 1 && (now - it->second->mtime() > walTTL)) { VLOG(1) << "Clean wals, Remove " << it->second->path(); unlink(it->second->path()); it = walFiles_.erase(it); + count++; } else { ++it; } } + if (count > 0) { + LOG(INFO) << idStr_ << "Clean wals number " << count; + } firstLogId_ = walFiles_.begin()->second->firstId(); } diff --git a/src/kvstore/wal/FileBasedWal.h b/src/kvstore/wal/FileBasedWal.h index bd6ef1e1c74..e6ba5a66ff4 100644 --- a/src/kvstore/wal/FileBasedWal.h +++ b/src/kvstore/wal/FileBasedWal.h @@ -23,7 +23,6 @@ struct FileBasedWalPolicy { // This is only a hint, the FileBasedWal will try to keep all messages // newer than ttl, but not guarantee to remove old messages right away int32_t ttl = 86400; - // The maximum size of each log message file (in byte). When the existing // log file reaches this size, a new file will be created size_t fileSize = 128 * 1024L * 1024L; @@ -43,10 +42,12 @@ using PreProcessor = folly::Function { FRIEND_TEST(FileBasedWal, TTLTest); + friend class FileBasedWalIterator; public: // A factory method to create a new WAL static std::shared_ptr getWal( const folly::StringPiece dir, + const std::string& idStr, FileBasedWalPolicy policy, PreProcessor preProcessor); @@ -99,7 +100,7 @@ class FileBasedWal final : public Wal // This method is *NOT* thread safe bool reset() override; - void cleanWAL() override; + void cleanWAL(int32_t ttl = 0) override; // Scan [firstLogId, lastLogId] // This method IS thread-safe @@ -133,6 +134,7 @@ class FileBasedWal final : public Wal // Callers **SHOULD NEVER** use this constructor directly // Callers should use static method getWal() instead FileBasedWal(const folly::StringPiece dir, + const std::string& idStr, FileBasedWalPolicy policy, PreProcessor preProcessor); @@ -165,9 +167,11 @@ class FileBasedWal final : public Wal * FileBasedWal Member Fields * **************************************/ + const std::string dir_; + std::string idStr_; + std::atomic stopped_{false}; - const std::string dir_; const FileBasedWalPolicy policy_; const size_t maxFileSize_; const size_t maxBufferSize_; diff --git a/src/kvstore/wal/FileBasedWalIterator.cpp b/src/kvstore/wal/FileBasedWalIterator.cpp index 92ad0f1481e..dd761b4a3f8 100644 --- a/src/kvstore/wal/FileBasedWalIterator.cpp +++ b/src/kvstore/wal/FileBasedWalIterator.cpp @@ -25,12 +25,14 @@ FileBasedWalIterator::FileBasedWalIterator( } if (currId_ > lastId_) { + LOG(ERROR) << wal_->idStr_ << "The log " << currId_ + << " is out of range, the lastLogId is " << lastId_; return; } if (startId < wal_->firstLogId()) { - LOG(ERROR) << "The given log id " << startId - << " is out of the range"; + LOG(ERROR) << wal_->idStr_ << "The given log id " << startId + << " is out of the range, the wal firstLogId is " << wal_->firstLogId(); currId_ = lastId_ + 1; return; } else { diff --git a/src/kvstore/wal/Wal.h b/src/kvstore/wal/Wal.h index 7ba32ac2888..6db882c68ad 100644 --- a/src/kvstore/wal/Wal.h +++ b/src/kvstore/wal/Wal.h @@ -45,7 +45,7 @@ class Wal { // This method is *NOT* thread safe virtual bool reset() = 0; - virtual void cleanWAL() = 0; + virtual void cleanWAL(int32_t ttl = 0) = 0; // Scan [firstLogId, lastLogId] virtual std::unique_ptr iterator(LogID firstLogId, diff --git a/src/kvstore/wal/test/FileBasedWalTest.cpp b/src/kvstore/wal/test/FileBasedWalTest.cpp index f34abb76a16..08113cb4a53 100644 --- a/src/kvstore/wal/test/FileBasedWalTest.cpp +++ b/src/kvstore/wal/test/FileBasedWalTest.cpp @@ -39,6 +39,7 @@ TEST(FileBasedWal, AppendLogs) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; @@ -56,6 +57,7 @@ TEST(FileBasedWal, AppendLogs) { // Now let's open it to read wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; @@ -85,6 +87,7 @@ TEST(FileBasedWal, CacheOverflow) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; @@ -107,6 +110,7 @@ TEST(FileBasedWal, CacheOverflow) { // Now let's open it to read wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; @@ -136,6 +140,7 @@ TEST(FileBasedWal, Rollback) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; @@ -210,6 +215,7 @@ TEST(FileBasedWal, RollbackThenReopen) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; @@ -235,6 +241,7 @@ TEST(FileBasedWal, RollbackThenReopen) { // Now let's open it to read wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; @@ -253,7 +260,6 @@ TEST(FileBasedWal, RollbackThenReopen) { EXPECT_EQ(801, id); } - TEST(FileBasedWal, RollbackToZero) { // Force to make each file 1MB, each buffer is 1MB, and there are two // buffers at most @@ -264,6 +270,7 @@ TEST(FileBasedWal, RollbackToZero) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; @@ -305,6 +312,7 @@ TEST(FileBasedWal, BackAndForth) { TempDir walDir("/tmp/testWal.XXXXXX"); auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; @@ -341,6 +349,7 @@ TEST(FileBasedWal, TTLTest) { policy.fileSize = 1024; policy.numBuffers = 30; auto wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; @@ -372,6 +381,7 @@ TEST(FileBasedWal, TTLTest) { { // Now let's open it to read wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; @@ -402,6 +412,7 @@ TEST(FileBasedWal, TTLTest) { { // Now let's open it to read wal = FileBasedWal::getWal(walDir.path(), + "", policy, [](LogID, TermID, ClusterID, const std::string&) { return true; diff --git a/src/meta/processors/admin/Balancer.h b/src/meta/processors/admin/Balancer.h index d809a061b8a..a633db1e2e1 100644 --- a/src/meta/processors/admin/Balancer.h +++ b/src/meta/processors/admin/Balancer.h @@ -49,7 +49,7 @@ class Balancer { FRIEND_TEST(BalanceTest, IntersectHostsLeaderBalancePlanTest); FRIEND_TEST(BalanceTest, LeaderBalanceTest); FRIEND_TEST(BalanceTest, ManyHostsLeaderBalancePlanTest); - FRIEND_TEST(BalanceIntegrationTest, DISABLED_LeaderBalanceTest); + FRIEND_TEST(BalanceIntegrationTest, LeaderBalanceTest); public: static Balancer* instance(kvstore::KVStore* kv) { diff --git a/src/meta/test/BalanceIntegrationTest.cpp b/src/meta/test/BalanceIntegrationTest.cpp index 6b2e1e1c65d..48ab3ca3710 100644 --- a/src/meta/test/BalanceIntegrationTest.cpp +++ b/src/meta/test/BalanceIntegrationTest.cpp @@ -26,7 +26,7 @@ TEST(BalanceIntegrationTest, SimpleTest) { LOG(INFO) << "Start storage server on " << sc->port_; } -TEST(BalanceIntegrationTest, DISABLED_LeaderBalanceTest) { +TEST(BalanceIntegrationTest, LeaderBalanceTest) { FLAGS_load_data_interval_secs = 1; FLAGS_heartbeat_interval_secs = 1; FLAGS_raft_heartbeat_interval_secs = 1; @@ -54,11 +54,7 @@ TEST(BalanceIntegrationTest, DISABLED_LeaderBalanceTest) { auto mClient = std::make_unique(threadPool, metaAddr, tempDataAddr, kClusterId, false); - auto ret = mClient->addHosts({tempDataAddr}).get(); - ASSERT_TRUE(ret.ok()); mClient->waitForMetadReady(); - ret = mClient->removeHosts({tempDataAddr}).get(); - ASSERT_TRUE(ret.ok()); int partition = 9; int replica = 3; @@ -73,8 +69,6 @@ TEST(BalanceIntegrationTest, DISABLED_LeaderBalanceTest) { storagePorts.emplace_back(storagePort); peers.emplace_back(storageAddr); - ret = mClient->addHosts({storageAddr}).get(); - ASSERT_TRUE(ret.ok()); VLOG(1) << "The storage server has been added to the meta service"; auto metaClient = std::make_shared(threadPool, metaAddr, storageAddr, @@ -93,15 +87,15 @@ TEST(BalanceIntegrationTest, DISABLED_LeaderBalanceTest) { serverContexts.emplace_back(std::move(sc)); } - ret = mClient->createSpace("storage", partition, replica).get(); + auto ret = mClient->createSpace("storage", partition, replica).get(); ASSERT_TRUE(ret.ok()); - sleep(FLAGS_load_data_interval_secs + 1); - sleep(FLAGS_raft_heartbeat_interval_secs); + sleep(FLAGS_load_data_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); auto code = balancer.leaderBalance(); ASSERT_EQ(code, cpp2::ErrorCode::SUCCEEDED); - sleep(FLAGS_raft_heartbeat_interval_secs); + LOG(INFO) << "Waiting for the leader balance"; + sleep(FLAGS_raft_heartbeat_interval_secs + 1); for (int i = 0; i < replica; i++) { std::unordered_map> leaderIds; EXPECT_EQ(3, serverContexts[i]->kvStore_->allLeader(leaderIds)); diff --git a/src/storage/AdminProcessor.h b/src/storage/AdminProcessor.h index 54fac66269c..24b786f6b0e 100644 --- a/src/storage/AdminProcessor.h +++ b/src/storage/AdminProcessor.h @@ -35,20 +35,17 @@ class TransLeaderProcessor : public BaseProcessor { auto part = nebula::value(ret); auto host = kvstore::NebulaStore::getRaftAddr(HostAddr(req.get_new_leader().get_ip(), req.get_new_leader().get_port())); - part->asyncTransferLeader(std::move(host), - [this, spaceId, partId] (kvstore::ResultCode code) { + part->asyncTransferLeader(host, + [this, spaceId, partId, host] (kvstore::ResultCode code) { auto leaderRet = kvstore_->partLeader(spaceId, partId); CHECK(ok(leaderRet)); if (code == kvstore::ResultCode::ERR_LEADER_CHANGED) { - auto addr = value(std::move(leaderRet)); - if (addr == HostAddr(0, 0)) { - // No leader is elected, just return ok + auto leader = value(std::move(leaderRet)); + // Check target is randomPeer (0, 0) or the election has completed. + if (host == HostAddr(0, 0) || leader == host) { code = kvstore::ResultCode::SUCCEEDED; } else { - nebula::cpp2::HostAddr leader; - leader.set_ip(addr.first); - leader.set_port(addr.second); - resp_.set_leader(std::move(leader)); + resp_.set_leader(toThriftHost(leader)); } } resp_.set_code(to(code)); diff --git a/src/storage/BaseProcessor.h b/src/storage/BaseProcessor.h index 46994179129..4000bbb621b 100644 --- a/src/storage/BaseProcessor.h +++ b/src/storage/BaseProcessor.h @@ -70,6 +70,13 @@ class BaseProcessor { } } + nebula::cpp2::HostAddr toThriftHost(const HostAddr& host) { + nebula::cpp2::HostAddr tHost; + tHost.set_ip(host.first); + tHost.set_port(host.second); + return tHost; + } + protected: kvstore::KVStore* kvstore_ = nullptr; meta::SchemaManager* schemaMan_ = nullptr;