diff --git a/src/common/base/Status.h b/src/common/base/Status.h index d54a420d9b1..5b852324a33 100644 --- a/src/common/base/Status.h +++ b/src/common/base/Status.h @@ -114,6 +114,7 @@ class Status final { STATUS_GENERATOR(CfgErrorType); STATUS_GENERATOR(CfgImmutable); STATUS_GENERATOR(LeaderChanged); + STATUS_GENERATOR(Balanced); #undef STATUS_GENERATOR @@ -147,6 +148,7 @@ class Status final { kCfgErrorType = 411, kCfgImmutable = 412, kLeaderChanged = 413, + kBalanced = 414, }; Code code() const { diff --git a/src/graph/BalanceExecutor.cpp b/src/graph/BalanceExecutor.cpp index 4602bbb2a89..894cd8608e0 100644 --- a/src/graph/BalanceExecutor.cpp +++ b/src/graph/BalanceExecutor.cpp @@ -24,6 +24,12 @@ void BalanceExecutor::execute() { case BalanceSentence::SubType::kLeader: balanceLeader(); break; + case BalanceSentence::SubType::kData: + balanceData(); + break; + case BalanceSentence::SubType::kShowBalancePlan: + showBalancePlan(); + break; case BalanceSentence::SubType::kUnknown: onError_(Status::Error("Type unknown")); break; @@ -60,5 +66,105 @@ void BalanceExecutor::balanceLeader() { std::move(future).via(runner).thenValue(cb).thenError(error); } +void BalanceExecutor::balanceData() { + auto future = ectx()->getMetaClient()->balance(); + auto *runner = ectx()->rctx()->runner(); + + auto cb = [this] (auto &&resp) { + if (!resp.ok()) { + DCHECK(onError_); + onError_(std::move(resp).status()); + return; + } + auto balanceId = std::move(resp).value(); + resp_ = std::make_unique(); + std::vector header{"ID"}; + resp_->set_column_names(std::move(header)); + + std::vector rows; + std::vector row; + row.resize(1); + row[0].set_integer(balanceId); + rows.emplace_back(); + + rows.back().set_columns(std::move(row)); + + resp_->set_rows(std::move(rows)); + + DCHECK(onFinish_); + onFinish_(); + }; + + auto error = [this] (auto &&e) { + LOG(ERROR) << "Exception caught: " << e.what(); + DCHECK(onError_); + onError_(Status::Error("Internal error")); + return; + }; + + std::move(future).via(runner).thenValue(cb).thenError(error); +} + +void BalanceExecutor::showBalancePlan() { + auto id = sentence_->balanceId(); + auto future = ectx()->getMetaClient()->showBalance(id); + auto *runner = ectx()->rctx()->runner(); + auto cb = [this] (auto &&resp) { + if (!resp.ok()) { + DCHECK(onError_); + onError_(std::move(resp).status()); + return; + } + auto tasks = std::move(resp).value(); + resp_ = std::make_unique(); + std::vector header{"balanceId, spaceId:partId, src->dst", "status"}; + resp_->set_column_names(std::move(header)); + + std::vector rows; + rows.reserve(tasks.size()); + for (auto& task : tasks) { + std::vector row; + row.resize(2); + row[0].set_str(std::move(task.get_id())); + switch (task.get_result()) { + case meta::cpp2::TaskResult::SUCCEEDED: + row[1].set_str("succeeded"); + break; + case meta::cpp2::TaskResult::FAILED: + row[1].set_str("failed"); + break; + case meta::cpp2::TaskResult::IN_PROGRESS: + row[1].set_str("in progress"); + break; + case meta::cpp2::TaskResult::INVALID: + row[1].set_str("invalid"); + break; + } + rows.emplace_back(); + rows.back().set_columns(std::move(row)); + } + resp_->set_rows(std::move(rows)); + DCHECK(onFinish_); + onFinish_(); + }; + + auto error = [this] (auto &&e) { + LOG(ERROR) << "Exception caught: " << e.what(); + DCHECK(onError_); + onError_(Status::Error("Internal error")); + return; + }; + + std::move(future).via(runner).thenValue(cb).thenError(error); +} + +void BalanceExecutor::setupResponse(cpp2::ExecutionResponse &resp) { + if (resp_) { + resp = std::move(*resp_); + } else { + Executor::setupResponse(resp); + } +} + } // namespace graph } // namespace nebula diff --git a/src/graph/BalanceExecutor.h b/src/graph/BalanceExecutor.h index 4b10bc6d9dd..e9080e3b886 100644 --- a/src/graph/BalanceExecutor.h +++ b/src/graph/BalanceExecutor.h @@ -27,6 +27,12 @@ class BalanceExecutor final : public Executor { void balanceLeader(); + void balanceData(); + + void showBalancePlan(); + + void setupResponse(cpp2::ExecutionResponse &resp) override; + private: BalanceSentence *sentence_{nullptr}; std::unique_ptr resp_; diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index 484d429b0dd..46af22c37b8 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -32,9 +32,10 @@ enum ErrorCode { E_CONFLICT = -29, E_WRONGCLUSTER = -30, - // KV Failure E_STORE_FAILURE = -31, E_STORE_SEGMENT_ILLEGAL = -32, + E_BAD_BALANCE_PLAN = -33, + E_BALANCED = -34, E_INVALID_PASSWORD = -41, E_INPROPER_ROLE = -42, @@ -432,11 +433,25 @@ struct BalanceReq { 2: optional i64 id, } +enum TaskResult { + SUCCEEDED = 0x00, + FAILED = 0x01, + IN_PROGRESS = 0x02, + INVALID = 0x03, +} (cpp.enum_strict) + + +struct BalanceTask { + 1: string id, + 2: TaskResult result, +} + struct BalanceResp { 1: ErrorCode code, 2: i64 id, // Valid if code equals E_LEADER_CHANGED. 3: common.HostAddr leader, + 4: list tasks, } struct LeaderBalanceReq { diff --git a/src/interface/raftex.thrift b/src/interface/raftex.thrift index be20f364250..12fc6d6c7c4 100644 --- a/src/interface/raftex.thrift +++ b/src/interface/raftex.thrift @@ -33,6 +33,8 @@ enum ErrorCode { E_TOO_MANY_REQUESTS = -15; E_PERSIST_SNAPSHOT_FAILED = -16; + E_BAD_ROLE = -17, + E_EXCEPTION = -20; // An thrift internal exception was thrown } diff --git a/src/interface/storage.thrift b/src/interface/storage.thrift index ffd26e78d40..21ac9254e6f 100644 --- a/src/interface/storage.thrift +++ b/src/interface/storage.thrift @@ -23,6 +23,7 @@ enum ErrorCode { E_KEY_HAS_EXISTS = -12, E_SPACE_NOT_FOUND = -13, E_PART_NOT_FOUND = -14, + E_CONSENSUS_ERROR = -15, // meta failures E_EDGE_PROP_NOT_FOUND = -21, @@ -32,6 +33,12 @@ enum ErrorCode { // Invalid request E_INVALID_FILTER = -31, E_INVALID_UPDATER = -32, + E_INVALID_STORE = -33, + E_INVALID_PEER = -34, + E_RETRY_EXHAUSTED = -35, + + // meta client failed + E_LOAD_META_FAILED = -41, E_UNKNOWN = -100, } (cpp.enum_strict) @@ -221,6 +228,9 @@ struct RemovePartReq { struct MemberChangeReq { 1: common.GraphSpaceID space_id, 2: common.PartitionID part_id, + 3: common.HostAddr peer, + // true means add a peer, false means remove a peer. + 4: bool add, } struct TransLeaderReq { diff --git a/src/kvstore/LogEncoder.cpp b/src/kvstore/LogEncoder.cpp index 63003e8cba7..3ef552baeda 100644 --- a/src/kvstore/LogEncoder.cpp +++ b/src/kvstore/LogEncoder.cpp @@ -164,59 +164,29 @@ std::vector decodeMultiValues(folly::StringPiece encoded) { return values; } -std::string encodeLearner(const HostAddr& learner) { +std::string encodeHost(LogType type, const HostAddr& host) { std::string encoded; - encoded.reserve(kHeadLen + sizeof(HostAddr)); + encoded.reserve(sizeof(int64_t) + 1 + sizeof(HostAddr)); // Timestamp (8 bytes) int64_t ts = time::WallClock::fastNowInMilliSec(); encoded.append(reinterpret_cast(&ts), sizeof(int64_t)); // Log type - auto type = LogType::OP_ADD_LEARNER; encoded.append(reinterpret_cast(&type), 1); - // Value length - uint32_t len = static_cast(sizeof(HostAddr)); - encoded.append(reinterpret_cast(&len), sizeof(len)); - // Learner addr - encoded.append(reinterpret_cast(&learner), sizeof(HostAddr)); + encoded.append(reinterpret_cast(&host), sizeof(HostAddr)); return encoded; } -HostAddr decodeLearner(const std::string& encoded) { +HostAddr decodeHost(LogType type, const folly::StringPiece& encoded) { HostAddr addr; - CHECK_EQ(kHeadLen + sizeof(HostAddr), encoded.size()); - memcpy(&addr.first, encoded.data() + kHeadLen, sizeof(addr.first)); + CHECK_EQ(sizeof(int64_t) + 1 + sizeof(HostAddr), encoded.size()); + CHECK(encoded[sizeof(int64_t)] == type); + memcpy(&addr.first, encoded.begin() + sizeof(int64_t) + 1, sizeof(addr.first)); memcpy(&addr.second, - encoded.data() + kHeadLen + sizeof(addr.first), + encoded.begin() + sizeof(int64_t) + 1 + sizeof(addr.first), sizeof(addr.second)); return addr; } -std::string encodeTransLeader(const HostAddr& targetAddr) { - std::string encoded; - encoded.reserve(kHeadLen + sizeof(HostAddr)); - // Timestamp (8 bytes) - int64_t ts = time::WallClock::fastNowInMilliSec(); - encoded.append(reinterpret_cast(&ts), sizeof(int64_t)); - // Log type - auto type = LogType::OP_TRANS_LEADER; - encoded.append(reinterpret_cast(&type), 1); - // Value length - uint32_t len = static_cast(sizeof(HostAddr)); - encoded.append(reinterpret_cast(&len), sizeof(len)); - // Target addr - encoded.append(reinterpret_cast(&targetAddr), sizeof(HostAddr)); - return encoded; -} - -HostAddr decodeTransLeader(folly::StringPiece encoded) { - HostAddr addr; - CHECK_EQ(kHeadLen + sizeof(HostAddr), encoded.size()); - memcpy(&addr.first, encoded.begin() + kHeadLen, sizeof(addr.first)); - memcpy(&addr.second, - encoded.begin() + kHeadLen + sizeof(addr.first), - sizeof(addr.second)); - return addr; -} } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/LogEncoder.h b/src/kvstore/LogEncoder.h index 0589acfbaf3..68e14e4c26a 100644 --- a/src/kvstore/LogEncoder.h +++ b/src/kvstore/LogEncoder.h @@ -21,6 +21,8 @@ enum LogType : char { OP_REMOVE_RANGE = 0x6, OP_ADD_LEARNER = 0x07, OP_TRANS_LEADER = 0x08, + OP_ADD_PEER = 0x09, + OP_REMOVE_PEER = 0x10, }; std::string encodeKV(const folly::StringPiece& key, @@ -38,11 +40,10 @@ std::string encodeMultiValues(LogType type, folly::StringPiece v2); std::vector decodeMultiValues(folly::StringPiece encoded); -std::string encodeLearner(const HostAddr& learner); -HostAddr decodeLearner(const std::string& encoded); -std::string encodeTransLeader(const HostAddr& targetAddr); -HostAddr decodeTransLeader(folly::StringPiece encoded); +std::string encodeHost(LogType type, const HostAddr& learner); +HostAddr decodeHost(LogType type, const folly::StringPiece& encoded); + } // namespace kvstore } // namespace nebula #endif // KVSTORE_LOGENCODER_H_ diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 1249e623875..ef4bf0875b6 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -97,7 +97,8 @@ bool NebulaStore::init() { spaceIt->second->parts_.emplace(partId, newPart(spaceId, partId, - enginePtr.get())); + enginePtr.get(), + false)); } } } catch (std::exception& e) { @@ -118,7 +119,7 @@ bool NebulaStore::init() { } std::sort(partIds.begin(), partIds.end()); for (auto& partId : partIds) { - addPart(spaceId, partId); + addPart(spaceId, partId, false); } } @@ -172,7 +173,7 @@ void NebulaStore::addSpace(GraphSpaceID spaceId) { } -void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId) { +void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) { folly::RWSpinLock::WriteHolder wh(&lock_); auto spaceIt = this->spaces_.find(spaceId); CHECK(spaceIt != this->spaces_.end()) << "Space should exist!"; @@ -199,13 +200,15 @@ void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId) { targetEngine->addPart(partId); spaceIt->second->parts_.emplace( partId, - newPart(spaceId, partId, targetEngine.get())); - LOG(INFO) << "Space " << spaceId << ", part " << partId << " has been added!"; + newPart(spaceId, partId, targetEngine.get(), asLearner)); + LOG(INFO) << "Space " << spaceId << ", part " << partId + << " has been added, asLearner " << asLearner; } std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, PartitionID partId, - KVEngine* engine) { + KVEngine* engine, + bool asLearner) { auto part = std::make_shared(spaceId, partId, raftAddr_, @@ -226,7 +229,7 @@ std::shared_ptr NebulaStore::newPart(GraphSpaceID spaceId, } } raftService_->addPartition(part); - part->start(std::move(peers)); + part->start(std::move(peers), asLearner); return part; } @@ -256,6 +259,7 @@ void NebulaStore::removePart(GraphSpaceID spaceId, PartitionID partId) { auto* e = partIt->second->engine(); CHECK_NOTNULL(e); raftService_->removePartition(partIt->second); + partIt->second->reset(); spaceIt->second->parts_.erase(partId); e->removePart(partId); } diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index e9add0380bf..e72bc69b741 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -166,28 +166,29 @@ class NebulaStore : public KVStore, public Handler { bool isLeader(GraphSpaceID spaceId, PartitionID partId); -private: /** * Implement four interfaces in Handler. * */ void addSpace(GraphSpaceID spaceId) override; - void addPart(GraphSpaceID spaceId, PartitionID partId) override; + void addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) override; void removeSpace(GraphSpaceID spaceId) override; void removePart(GraphSpaceID spaceId, PartitionID partId) override; + ErrorOr> space(GraphSpaceID spaceId); + +private: std::unique_ptr newEngine(GraphSpaceID spaceId, const std::string& path); std::shared_ptr newPart(GraphSpaceID spaceId, PartitionID partId, - KVEngine* engine); + KVEngine* engine, + bool asLearner); ErrorOr engine(GraphSpaceID spaceId, PartitionID partId); - ErrorOr> space(GraphSpaceID spaceId); - private: // The lock used to protect spaces_ folly::RWSpinLock lock_; diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index dbbad899f40..123f8630b0d 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -135,6 +135,13 @@ void Part::asyncRemoveRange(folly::StringPiece start, }); } +void Part::sync(KVCallback cb) { + sendCommandAsync("") + .then([callback = std::move(cb)] (AppendLogResult res) mutable { + callback(toResultCode(res)); + }); +} + void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) { atomicOpAsync(std::move(op)).then([callback = std::move(cb)] (AppendLogResult res) mutable { callback(toResultCode(res)); @@ -142,7 +149,7 @@ void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) { } void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) { - std::string log = encodeLearner(learner); + std::string log = encodeHost(OP_ADD_LEARNER, learner); sendCommandAsync(std::move(log)) .then([callback = std::move(cb)] (AppendLogResult res) mutable { callback(toResultCode(res)); @@ -150,7 +157,23 @@ void Part::asyncAddLearner(const HostAddr& learner, KVCallback cb) { } void Part::asyncTransferLeader(const HostAddr& target, KVCallback cb) { - std::string log = encodeTransLeader(target); + std::string log = encodeHost(OP_TRANS_LEADER, target); + sendCommandAsync(std::move(log)) + .then([callback = std::move(cb)] (AppendLogResult res) mutable { + callback(toResultCode(res)); + }); +} + +void Part::asyncAddPeer(const HostAddr& peer, KVCallback cb) { + std::string log = encodeHost(OP_ADD_PEER, peer); + sendCommandAsync(std::move(log)) + .then([callback = std::move(cb)] (AppendLogResult res) mutable { + callback(toResultCode(res)); + }); +} + +void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) { + std::string log = encodeHost(OP_REMOVE_PEER, peer); sendCommandAsync(std::move(log)) .then([callback = std::move(cb)] (AppendLogResult res) mutable { callback(toResultCode(res)); @@ -245,13 +268,18 @@ bool Part::commitLogs(std::unique_ptr iter) { } break; } + case OP_ADD_PEER: case OP_ADD_LEARNER: { break; } case OP_TRANS_LEADER: { - auto newLeader = decodeTransLeader(log); + auto newLeader = decodeHost(OP_TRANS_LEADER, log); commitTransLeader(newLeader); - LOG(INFO) << idStr_ << "Transfer leader to " << newLeader; + break; + } + case OP_REMOVE_PEER: { + auto peer = decodeHost(OP_REMOVE_PEER, log); + commitRemovePeer(peer); break; } default: { @@ -318,15 +346,23 @@ bool Part::preProcessLog(LogID logId, if (!log.empty()) { switch (log[sizeof(int64_t)]) { case OP_ADD_LEARNER: { - auto learner = decodeLearner(log); + auto learner = decodeHost(OP_ADD_LEARNER, log); addLearner(learner); - LOG(INFO) << idStr_ << "Preprocess add learner " << learner; break; } case OP_TRANS_LEADER: { - auto newLeader = decodeTransLeader(log); + auto newLeader = decodeHost(OP_TRANS_LEADER, log); preProcessTransLeader(newLeader); - LOG(INFO) << idStr_ << "Preprocess transfer leader to " << newLeader; + break; + } + case OP_ADD_PEER: { + auto peer = decodeHost(OP_ADD_PEER, log); + addPeer(peer); + break; + } + case OP_REMOVE_PEER: { + auto peer = decodeHost(OP_REMOVE_PEER, log); + preProcessRemovePeer(peer); break; } default: { diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index e41501f4918..e3175838480 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -12,6 +12,7 @@ #include "kvstore/Common.h" #include "kvstore/KVEngine.h" #include "kvstore/raftex/SnapshotManager.h" +#include "kvstore/wal/FileBasedWal.h" namespace nebula { namespace kvstore { @@ -54,6 +55,13 @@ class Part : public raftex::RaftPart { void asyncTransferLeader(const HostAddr& target, KVCallback cb); + void asyncAddPeer(const HostAddr& peer, KVCallback cb); + + void asyncRemovePeer(const HostAddr& peer, KVCallback cb); + + // Sync the information committed on follower. + void sync(KVCallback cb); + void registerNewLeaderCb(NewLeaderCallback cb) { newLeaderCb_ = std::move(cb); } @@ -62,6 +70,12 @@ class Part : public raftex::RaftPart { newLeaderCb_ = nullptr; } + // clean up all data about this part. + void reset() { + LOG(INFO) << idStr_ << "Clean up all wals"; + wal()->reset(); + } + private: /** * Methods inherited from RaftPart diff --git a/src/kvstore/PartManager.cpp b/src/kvstore/PartManager.cpp index 7e08266586a..945eb9b84c7 100644 --- a/src/kvstore/PartManager.cpp +++ b/src/kvstore/PartManager.cpp @@ -85,7 +85,7 @@ void MetaServerBasedPartManager::onSpaceRemoved(GraphSpaceID spaceId) { void MetaServerBasedPartManager::onPartAdded(const PartMeta& partMeta) { if (handler_ != nullptr) { - handler_->addPart(partMeta.spaceId_, partMeta.partId_); + handler_->addPart(partMeta.spaceId_, partMeta.partId_, false); } else { VLOG(1) << "handler_ is nullptr!"; } diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index 0f0d2715e09..a580d1a2c28 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -18,7 +18,7 @@ namespace kvstore { class Handler { public: virtual void addSpace(GraphSpaceID spaceId) = 0; - virtual void addPart(GraphSpaceID spaceId, PartitionID partId) = 0; + virtual void addPart(GraphSpaceID spaceId, PartitionID partId, bool asLearner) = 0; virtual void removeSpace(GraphSpaceID spaceId) = 0; virtual void removePart(GraphSpaceID spaceId, PartitionID partId) = 0; }; @@ -96,7 +96,7 @@ class MemPartManager final : public PartManager { handler_->addSpace(spaceId); } if (noPart && handler_) { - handler_->addPart(spaceId, partId); + handler_->addPart(spaceId, partId, false); } } diff --git a/src/kvstore/SnapshotManagerImpl.cpp b/src/kvstore/SnapshotManagerImpl.cpp index 33d80a1cc22..3d03624ff25 100644 --- a/src/kvstore/SnapshotManagerImpl.cpp +++ b/src/kvstore/SnapshotManagerImpl.cpp @@ -38,9 +38,7 @@ void SnapshotManagerImpl::accessAllRowsInSnapshot(GraphSpaceID spaceId, totalCount++; iter->next(); } - if (data.size() > 0) { - cb(std::move(data), totalCount, totalSize, true); - } + cb(std::move(data), totalCount, totalSize, true); } } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/raftex/Host.cpp b/src/kvstore/raftex/Host.cpp index 98857a80a3d..719f1679e94 100644 --- a/src/kvstore/raftex/Host.cpp +++ b/src/kvstore/raftex/Host.cpp @@ -321,9 +321,22 @@ void Host::appendLogsInternal(folly::EventBase* eb, << " to the followers lastLodId " << resp.get_last_log_id(); { std::lock_guard g(self->lock_); - self->lastLogIdSent_ = resp.get_last_log_id(); - self->lastLogTermSent_ = resp.get_last_log_term(); - self->setResponse(resp); + auto res = self->checkStatus(); + if (res != cpp2::ErrorCode::SUCCEEDED) { + VLOG(2) << self->idStr_ + << "The host is not in a proper status," + " skip waiting the snapshot"; + cpp2::AppendLogResponse r; + r.set_error_code(res); + self->setResponse(r); + } else { + self->lastLogIdSent_ = resp.get_last_log_id(); + self->lastLogTermSent_ = resp.get_last_log_term(); + // For log stale, we think the request has been succeeded + cpp2::AppendLogResponse r; + r.set_error_code(cpp2::ErrorCode::SUCCEEDED); + self->setResponse(r); + } } self->noMoreRequestCV_.notify_all(); return; diff --git a/src/kvstore/raftex/Host.h b/src/kvstore/raftex/Host.h index 7a76867db42..3ae2fb4f519 100644 --- a/src/kvstore/raftex/Host.h +++ b/src/kvstore/raftex/Host.h @@ -24,6 +24,7 @@ namespace raftex { class RaftPart; class Host final : public std::enable_shared_from_this { + friend class RaftPart; public: Host(const HostAddr& addr, std::shared_ptr part, bool isLearner = false); @@ -58,6 +59,10 @@ class Host final : public std::enable_shared_from_this { return isLearner_; } + void setLearner(bool isLearner) { + isLearner_ = isLearner; + } + folly::Future askForVote( const cpp2::AskForVoteRequest& req); diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 3a03d04b684..5124c482f88 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -19,8 +19,6 @@ #include "kvstore/raftex/Host.h" -DEFINE_bool(accept_log_append_during_pulling, false, - "Whether to accept new logs during pulling the snapshot"); DEFINE_uint32(raft_heartbeat_interval_secs, 5, "Seconds between each heartbeat"); @@ -290,6 +288,7 @@ void RaftPart::start(std::vector&& peers, bool asLearner) { // Start all peer hosts for (auto& addr : peers) { + LOG(INFO) << idStr_ << "Add peer " << addr; auto hostPtr = std::make_shared(addr, shared_from_this()); hosts_.emplace_back(hostPtr); } @@ -374,15 +373,19 @@ void RaftPart::addLearner(const HostAddr& addr) { void RaftPart::preProcessTransLeader(const HostAddr& target) { CHECK(!raftLock_.try_lock()); - LOG(INFO) << idStr_ << "Commit transfer leader to " << target; + LOG(INFO) << idStr_ << "Pre process transfer leader to " << target; switch (role_) { case Role::FOLLOWER: { if (target != addr_ && target != HostAddr(0, 0)) { LOG(INFO) << idStr_ << "I am follower, just wait for the new leader."; } else { LOG(INFO) << idStr_ << "I will be the new leader, trigger leader election now!"; - role_ = Role::CANDIDATE; bgWorkers_->addTask([self = shared_from_this()] { + { + std::unique_lock lck(self->raftLock_); + self->role_ = Role::CANDIDATE; + self->leader_ = HostAddr(0, 0); + } self->leaderElection(); }); } @@ -412,13 +415,101 @@ void RaftPart::commitTransLeader(const HostAddr& target) { break; } case Role::FOLLOWER: - case Role::CANDIDATE: - case Role::LEARNER: { - CHECK(target != addr_); + case Role::CANDIDATE: { LOG(INFO) << idStr_ << "I am " << roleStr(role_) << ", just wait for the new leader!"; break; } + case Role::LEARNER: { + LOG(INFO) << idStr_ << "I am learner, not in the raft group, skip the log"; + break; + } + } +} + +void RaftPart::updateQuorum() { + CHECK(!raftLock_.try_lock()); + int32_t total = 0; + for (auto& h : hosts_) { + if (!h->isLearner()) { + total++; + } + } + quorum_ = (total + 1) / 2; +} + +void RaftPart::addPeer(const HostAddr& peer) { + CHECK(!raftLock_.try_lock()); + if (peer == addr_) { + if (role_ == Role::LEARNER) { + LOG(INFO) << idStr_ << "I am learner, promote myself to be follower"; + role_ = Role::FOLLOWER; + updateQuorum(); + } else { + LOG(INFO) << idStr_ << "I am already in the raft group!"; + } + return; + } + auto it = std::find_if(hosts_.begin(), hosts_.end(), [&peer] (const auto& h) { + return h->address() == peer; + }); + if (it == hosts_.end()) { + hosts_.emplace_back(std::make_shared(peer, shared_from_this())); + updateQuorum(); + LOG(INFO) << idStr_ << "Add peer " << peer; + } else { + if ((*it)->isLearner()) { + LOG(INFO) << idStr_ << "The host " << peer + << " has been existed as learner, promote it!"; + (*it)->setLearner(false); + updateQuorum(); + } else { + LOG(INFO) << idStr_ << "The host " << peer << " has been existed as follower!"; + } + } +} + +void RaftPart::removePeer(const HostAddr& peer) { + CHECK(!raftLock_.try_lock()); + if (peer == addr_) { + // status_ = Status::STOPPED; + LOG(INFO) << idStr_ << "Remove myself from the raft group."; + return; + } + auto it = std::find_if(hosts_.begin(), hosts_.end(), [&peer] (const auto& h) { + return h->address() == peer; + }); + if (it == hosts_.end()) { + LOG(INFO) << idStr_ << "The peer " << peer << " not exist!"; + } else { + if ((*it)->isLearner()) { + LOG(INFO) << idStr_ << "The peer is learner, remove it directly!"; + hosts_.erase(it); + return; + } + hosts_.erase(it); + updateQuorum(); + LOG(INFO) << idStr_ << "Remove peer " << peer; + } +} + +void RaftPart::preProcessRemovePeer(const HostAddr& peer) { + CHECK(!raftLock_.try_lock()); + if (role_ == Role::LEADER) { + LOG(INFO) << idStr_ << "I am leader, skip remove peer in preProcessLog"; + return; + } + removePeer(peer); +} + +void RaftPart::commitRemovePeer(const HostAddr& peer) { + CHECK(!raftLock_.try_lock()); + if (role_ == Role::FOLLOWER || role_ == Role::LEARNER) { + LOG(INFO) << idStr_ << "I am " << roleStr(role_) + << ", skip remove peer in commit"; + return; } + CHECK(Role::LEADER == role_); + removePeer(peer); } folly::Future RaftPart::appendAsync(ClusterID source, @@ -842,6 +933,7 @@ bool RaftPart::needToStartElection() { << lastMsgRecvDur_.elapsedInSec() << ", term " << term_; role_ = Role::CANDIDATE; + leader_ = HostAddr(0, 0); LOG(INFO) << idStr_ << "needToStartElection: lastMsgRecvDur " << lastMsgRecvDur_.elapsedInSec() << ", term_ " << term_; @@ -1093,8 +1185,12 @@ void RaftPart::processAskForVoteRequest( return; } - VLOG(2) << idStr_ << "The partition currently is a " - << roleStr(role_); + LOG(INFO) << idStr_ << "The partition currently is a " + << roleStr(role_); + if (role_ == Role::LEARNER) { + resp.set_error_code(cpp2::ErrorCode::E_BAD_ROLE); + return; + } // Check term id auto term = role_ == Role::CANDIDATE ? proposedTerm_ : term_; @@ -1133,6 +1229,16 @@ void RaftPart::processAskForVoteRequest( } } + auto candidate = HostAddr(req.get_candidate_ip(), req.get_candidate_port()); + auto hosts = followers(); + auto it = std::find_if(hosts.begin(), hosts.end(), [&candidate] (const auto& h){ + return h->address() == candidate; + }); + if (it == hosts.end()) { + LOG(INFO) << idStr_ << "The candidate " << candidate << " is not my peers"; + resp.set_error_code(cpp2::ErrorCode::E_WRONG_LEADER); + return; + } // Ok, no reason to refuse, we will vote for the candidate LOG(INFO) << idStr_ << "The partition will vote for the candidate"; resp.set_error_code(cpp2::ErrorCode::SUCCEEDED); @@ -1215,7 +1321,7 @@ void RaftPart::processAppendLogRequest( return; } // Check leadership - cpp2::ErrorCode err = verifyLeader(req, g); + cpp2::ErrorCode err = verifyLeader(req); if (err != cpp2::ErrorCode::SUCCEEDED) { // Wrong leadership VLOG(2) << idStr_ << "Will not follow the leader"; @@ -1353,10 +1459,18 @@ void RaftPart::processAppendLogRequest( cpp2::ErrorCode RaftPart::verifyLeader( - const cpp2::AppendLogRequest& req, - std::lock_guard& lck) { + const cpp2::AppendLogRequest& req) { + CHECK(!raftLock_.try_lock()); + auto candidate = HostAddr(req.get_leader_ip(), req.get_leader_port()); + auto hosts = followers(); + auto it = std::find_if(hosts.begin(), hosts.end(), [&candidate] (const auto& h){ + return h->address() == candidate; + }); + if (it == hosts.end()) { + VLOG(2) << idStr_ << "The candidate leader " << candidate << " is not my peers"; + return cpp2::ErrorCode::E_WRONG_LEADER; + } VLOG(2) << idStr_ << "The current role is " << roleStr(role_); - UNUSED(lck); switch (role_) { case Role::LEARNER: case Role::FOLLOWER: { @@ -1536,6 +1650,25 @@ void RaftPart::reset() { lastTotalSize_ = 0; } +AppendLogResult RaftPart::isCatchedUp(const HostAddr& peer) { + std::lock_guard lck(logsLock_); + if (role_ != Role::LEADER) { + LOG(INFO) << idStr_ << "I am not the leader"; + return AppendLogResult::E_NOT_A_LEADER; + } + if (peer == addr_) { + LOG(INFO) << idStr_ << "I am the leader"; + return AppendLogResult::SUCCEEDED; + } + for (auto& host : hosts_) { + if (host->addr_ == peer) { + return host->sendingSnapshot_ ? AppendLogResult::E_SENDING_SNAPSHOT + : AppendLogResult::SUCCEEDED; + } + } + return AppendLogResult::E_INVALID_PEER; +} + } // namespace raftex } // namespace nebula diff --git a/src/kvstore/raftex/RaftPart.h b/src/kvstore/raftex/RaftPart.h index 99d5c01f696..f959f56c8f9 100644 --- a/src/kvstore/raftex/RaftPart.h +++ b/src/kvstore/raftex/RaftPart.h @@ -10,6 +10,7 @@ #include "base/Base.h" #include #include +#include #include "gen-cpp2/raftex_types.h" #include "time/Duration.h" #include "thread/GenericThreadPool.h" @@ -39,6 +40,9 @@ enum class AppendLogResult { E_BUFFER_OVERFLOW = -5, E_WAL_FAILURE = -6, E_TERM_OUT_OF_DATE = -7, + E_SENDING_SNAPSHOT = -8, + E_INVALID_PEER = -9, + E_NOT_ENOUGH_ACKS = -10, }; enum class LogType { @@ -69,6 +73,9 @@ class RaftPart : public std::enable_shared_from_this { friend class AppendLogsIterator; friend class Host; friend class SnapshotManager; + FRIEND_TEST(MemberChangeTest, AddRemovePeerTest); + FRIEND_TEST(MemberChangeTest, RemoveLeaderTest); + public: virtual ~RaftPart(); @@ -128,6 +135,11 @@ class RaftPart : public std::enable_shared_from_this { void preProcessTransLeader(const HostAddr& target); + + void preProcessRemovePeer(const HostAddr& peer); + + void commitRemovePeer(const HostAddr& peer); + // Change the partition status to RUNNING. This is called // by the inherited class, when it's ready to serve virtual void start(std::vector&& peers, bool asLearner = false); @@ -163,7 +175,11 @@ class RaftPart : public std::enable_shared_from_this { * */ folly::Future sendCommandAsync(std::string log); - + /** + * Check if the peer has catched up data from leader. If leader is sending the snapshot, + * the method will return false. + * */ + AppendLogResult isCatchedUp(const HostAddr& peer); /***************************************************** * @@ -239,6 +255,10 @@ class RaftPart : public std::enable_shared_from_this { // Reset the part, clean up all data and WALs. void reset(); + void addPeer(const HostAddr& peer); + + void removePeer(const HostAddr& peer); + private: enum class Status { STARTING = 0, // The part is starting, not ready for service @@ -279,8 +299,7 @@ class RaftPart : public std::enable_shared_from_this { ***************************************************/ const char* roleStr(Role role) const; - cpp2::ErrorCode verifyLeader(const cpp2::AppendLogRequest& req, - std::lock_guard& lck); + cpp2::ErrorCode verifyLeader(const cpp2::AppendLogRequest& req); /***************************************************************** * Asynchronously send a heartbeat (An empty log entry) @@ -352,6 +371,8 @@ class RaftPart : public std::enable_shared_from_this { bool checkAppendLogResult(AppendLogResult res); + void updateQuorum(); + protected: template class PromiseSet final { diff --git a/src/kvstore/raftex/test/CMakeLists.txt b/src/kvstore/raftex/test/CMakeLists.txt index d8e2f94c3d2..6bf34e5dca8 100644 --- a/src/kvstore/raftex/test/CMakeLists.txt +++ b/src/kvstore/raftex/test/CMakeLists.txt @@ -70,3 +70,11 @@ nebula_add_test( OBJECTS ${RAFTEX_TEST_LIBS} LIBRARIES ${THRIFT_LIBRARIES} wangle gtest ) + +nebula_add_test( + NAME member_change_test + SOURCES MemberChangeTest.cpp RaftexTestBase.cpp TestShard.cpp + OBJECTS ${RAFTEX_TEST_LIBS} + LIBRARIES ${THRIFT_LIBRARIES} wangle gtest +) + diff --git a/src/kvstore/raftex/test/MemberChangeTest.cpp b/src/kvstore/raftex/test/MemberChangeTest.cpp new file mode 100644 index 00000000000..99294b2bd06 --- /dev/null +++ b/src/kvstore/raftex/test/MemberChangeTest.cpp @@ -0,0 +1,141 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include +#include +#include "fs/TempDir.h" +#include "fs/FileUtils.h" +#include "thread/GenericThreadPool.h" +#include "network/NetworkUtils.h" +#include "kvstore/raftex/RaftexService.h" +#include "kvstore/raftex/test/RaftexTestBase.h" +#include "kvstore/raftex/test/TestShard.h" + +DECLARE_uint32(raft_heartbeat_interval_secs); + + +namespace nebula { +namespace raftex { + +TEST(MemberChangeTest, AddRemovePeerTest) { + fs::TempDir walRoot("/tmp/member_change.XXXXXX"); + std::shared_ptr workers; + std::vector wals; + std::vector allHosts; + std::vector> services; + std::vector> copies; + + std::shared_ptr leader; + std::vector isLearner = {false, false, false, true}; + setupRaft(4, walRoot, workers, wals, allHosts, services, copies, leader, isLearner); + + // Check all hosts agree on the same leader + checkLeadership(copies, leader); + + CHECK_EQ(2, leader->hosts_.size()); + + { + auto f = leader->sendCommandAsync(test::encodeAddPeer(allHosts[3])); + f.wait(); + + for (auto& c : copies) { + CHECK_EQ(3, c->hosts_.size()); + } + } + std::vector msgs; + LogID id = -1; + appendLogs(0, 99, leader, msgs, id); + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_raft_heartbeat_interval_secs); + + checkConsensus(copies, 0, 99, msgs); + + { + LOG(INFO) << "Add the same peer again!"; + auto f = leader->sendCommandAsync(test::encodeAddPeer(allHosts[3])); + f.wait(); + + for (auto& c : copies) { + CHECK_EQ(3, c->hosts_.size()); + } + } + { + LOG(INFO) << "Remove the peer added!"; + auto f = leader->sendCommandAsync(test::encodeRemovePeer(allHosts[3])); + f.wait(); + + for (size_t i = 0; i < copies.size() - 1; i++) { + CHECK_EQ(2, copies[i]->hosts_.size()); + } +// CHECK(copies[3]->isStopped()); + } + finishRaft(services, copies, workers, leader); +} + +TEST(MemberChangeTest, RemoveLeaderTest) { + fs::TempDir walRoot("/tmp/member_change.XXXXXX"); + std::shared_ptr workers; + std::vector wals; + std::vector allHosts; + std::vector> services; + std::vector> copies; + + std::shared_ptr leader; + std::vector isLearner = {false, false, false, false}; + setupRaft(4, walRoot, workers, wals, allHosts, services, copies, leader, isLearner); + + // Check all hosts agree on the same leader + auto leaderIndex = checkLeadership(copies, leader); + + CHECK_EQ(3, leader->hosts_.size()); + + { + LOG(INFO) << "Send remove peer request, remove " << allHosts[leaderIndex]; + leader.reset(); + auto f = copies[leaderIndex]->sendCommandAsync( + test::encodeRemovePeer(allHosts[leaderIndex])); + f.wait(); + copies[leaderIndex]->stop(); +// CHECK(copies[leaderIndex]->isStopped()); + for (size_t i = 0; i < copies.size(); i++) { + if (static_cast(i) != leaderIndex) { + CHECK_EQ(2, copies[i]->hosts_.size()); + } + } + } + + waitUntilLeaderElected(copies, leader); + + auto nLeaderIndex = checkLeadership(copies, leader); + CHECK(leaderIndex != nLeaderIndex); + + std::vector msgs; + LogID id = -1; + appendLogs(0, 99, leader, msgs, id); + // Sleep a while to make sure the last log has been committed on + // followers + sleep(FLAGS_raft_heartbeat_interval_secs); + + checkConsensus(copies, 0, 99, msgs); + + finishRaft(services, copies, workers, leader); +} + +} // namespace raftex +} // namespace nebula + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + + return RUN_ALL_TESTS(); +} + + diff --git a/src/kvstore/raftex/test/RaftCase.cpp b/src/kvstore/raftex/test/RaftCase.cpp index 48a649c412a..2228afaff41 100644 --- a/src/kvstore/raftex/test/RaftCase.cpp +++ b/src/kvstore/raftex/test/RaftCase.cpp @@ -205,7 +205,7 @@ TEST_F(FiveRaftTest, DISABLED_Figure8) { if (alive < quorum) { size_t idx = folly::Random::rand32(size_); - if (!copies_[idx]->isRunning_) { + if (!copies_[idx]->isRunning()) { rebootOneCopy(services_, copies_, allHosts_, idx); ++alive; } @@ -214,7 +214,7 @@ TEST_F(FiveRaftTest, DISABLED_Figure8) { LOG(INFO) << "=====> Now let's reboot all copy and check consensus"; for (int32_t i = 0; i < size_; i++) { - if (!copies_[i]->isRunning_) { + if (!copies_[i]->isRunning()) { rebootOneCopy(services_, copies_, allHosts_, i); } } diff --git a/src/kvstore/raftex/test/RaftexTestBase.cpp b/src/kvstore/raftex/test/RaftexTestBase.cpp index 92b84420d62..a923cc6dde9 100644 --- a/src/kvstore/raftex/test/RaftexTestBase.cpp +++ b/src/kvstore/raftex/test/RaftexTestBase.cpp @@ -104,7 +104,10 @@ void waitUntilLeaderElected( int32_t index = 0; for (auto& c : copies) { // if a copy in abnormal state, its leader is (0, 0) - if (!isLearner[index] && c != nullptr && leader != c && c->isRunning_ == true) { + VLOG(3) << c->address() << " , leader address " + << c->leader() << ", elected one " + << leader->address(); + if (!isLearner[index] && c != nullptr && leader != c && c->isRunning()) { if (leader->address() != c->leader()) { sameLeader = false; break; @@ -126,7 +129,7 @@ void waitUntilAllHasLeader(const std::vector>& while (true) { bool allHaveLeader = true; for (auto& c : copies) { - if (c != nullptr && c->isRunning_ == true) { + if (c != nullptr && c->isRunning()) { if (c->leader().first == 0 && c->leader().second == 0) { allHaveLeader = false; break; @@ -207,7 +210,6 @@ void setupRaft( services[i]->addPartition(copies.back()); copies.back()->start(getPeers(allHosts, allHosts[i], isLearner), isLearner[i]); - copies.back()->isRunning_ = true; } // Wait untill all copies agree on the same leader @@ -242,7 +244,7 @@ int32_t checkLeadership(std::vector>& copies, int32_t leaderIndex = -1; int i = 0; for (auto& c : copies) { - if (c != nullptr && leader != c && !c->isLearner() && c->isRunning_) { + if (c != nullptr && leader != c && !c->isLearner() && c->isRunning()) { EXPECT_EQ(leader->address(), c->leader()); } else if (leader == c) { leaderIndex = i; @@ -266,7 +268,7 @@ void checkLeadership(std::vector>& copies, void checkNoLeader(std::vector>& copies) { for (auto& c : copies) { - if (c != nullptr && c->isRunning_ == true) { + if (c != nullptr && c->isRunning()) { ASSERT_FALSE(c->isLeader()); } } @@ -298,13 +300,13 @@ void checkConsensus(std::vector>& copies, // Check every copy for (auto& c : copies) { - if (c != nullptr && c->isRunning_ == true) { + if (c != nullptr && c->isRunning()) { ASSERT_EQ(msgs.size(), c->getNumLogs()); } } for (size_t i = start; i <= end; i++) { for (auto& c : copies) { - if (c != nullptr && c->isRunning_ == true) { + if (c != nullptr && c->isRunning()) { folly::StringPiece msg; ASSERT_TRUE(c->getLogMsg(i, msg)); ASSERT_EQ(msgs[i], msg.toString()); @@ -317,7 +319,7 @@ void killOneCopy(std::vector>& services, std::vector>& copies, std::shared_ptr& leader, size_t index) { - copies[index]->isRunning_ = false; + copies[index]->stop(); services[index]->removePartition(copies[index]); if (leader != nullptr && index == leader->index()) { std::lock_guard lock(leaderMutex); @@ -332,7 +334,6 @@ void rebootOneCopy(std::vector>& services, size_t index) { services[index]->addPartition(copies[index]); copies[index]->start(getPeers(allHosts, allHosts[index])); - copies[index]->isRunning_ = true; LOG(INFO) << "copies " << index << " reboot"; } diff --git a/src/kvstore/raftex/test/SnapshotTest.cpp b/src/kvstore/raftex/test/SnapshotTest.cpp index 8d50c265c52..fcabd87be25 100644 --- a/src/kvstore/raftex/test/SnapshotTest.cpp +++ b/src/kvstore/raftex/test/SnapshotTest.cpp @@ -27,9 +27,9 @@ namespace raftex { TEST(SnapshotTest, LearnerCatchUpDataTest) { fs::TempDir walRoot("/tmp/catch_up_data.XXXXXX"); - FLAGS_wal_file_size = 128; - FLAGS_wal_buffer_size = 64; - FLAGS_wal_buffer_num = 1000; + FLAGS_wal_file_size = 1024; + FLAGS_wal_buffer_size = 512; + FLAGS_wal_buffer_num = 30; FLAGS_raft_rpc_timeout_ms = 2000; std::shared_ptr workers; std::vector wals; diff --git a/src/kvstore/raftex/test/TestShard.cpp b/src/kvstore/raftex/test/TestShard.cpp index 4911e8ea393..b561f887e57 100644 --- a/src/kvstore/raftex/test/TestShard.cpp +++ b/src/kvstore/raftex/test/TestShard.cpp @@ -67,6 +67,36 @@ std::pair decodeSnapshotRow(const std::string& rawData) { return std::make_pair(id, std::move(str)); } +std::string encodeAddPeer(const HostAddr& addr) { + std::string str; + CommandType type = CommandType::ADD_PEER; + str.append(reinterpret_cast(&type), 1); + str.append(reinterpret_cast(&addr), sizeof(HostAddr)); + return str; +} + +HostAddr decodeAddPeer(const folly::StringPiece& log) { + HostAddr addr; + memcpy(&addr.first, log.begin() + 1, sizeof(addr.first)); + memcpy(&addr.second, log.begin() + 1 + sizeof(addr.first), sizeof(addr.second)); + return addr; +} + +std::string encodeRemovePeer(const HostAddr& addr) { + std::string str; + CommandType type = CommandType::REMOVE_PEER; + str.append(reinterpret_cast(&type), 1); + str.append(reinterpret_cast(&addr), sizeof(HostAddr)); + return str; +} + +HostAddr decodeRemovePeer(const folly::StringPiece& log) { + HostAddr addr; + memcpy(&addr.first, log.begin() + 1, sizeof(addr.first)); + memcpy(&addr.second, log.begin() + 1 + sizeof(addr.first), sizeof(addr.second)); + return addr; +} + TestShard::TestShard(size_t idx, std::shared_ptr svc, PartitionID partId, @@ -125,6 +155,12 @@ bool TestShard::commitLogs(std::unique_ptr iter) { commitTransLeader(nLeader); break; } + case CommandType::REMOVE_PEER: { + auto peer = decodeRemovePeer(log); + commitRemovePeer(peer); + break; + } + case CommandType::ADD_PEER: case CommandType::ADD_LEARNER: { break; } diff --git a/src/kvstore/raftex/test/TestShard.h b/src/kvstore/raftex/test/TestShard.h index 0e6f075c3b3..0ab13cf9d4d 100644 --- a/src/kvstore/raftex/test/TestShard.h +++ b/src/kvstore/raftex/test/TestShard.h @@ -21,8 +21,9 @@ namespace test { enum class CommandType : int8_t { ADD_LEARNER = 0x01, TRANSFER_LEADER = 0x02, + ADD_PEER = 0x03, + REMOVE_PEER = 0x04, }; - std::string encodeLearner(const HostAddr& addr); HostAddr decodeLearner(const folly::StringPiece& log); @@ -37,6 +38,14 @@ std::string encodeSnapshotRow(LogID logId, const std::string& row); std::pair decodeSnapshotRow(const std::string& rawData); +std::string encodeAddPeer(const HostAddr& addr); + +HostAddr decodeAddPeer(const folly::StringPiece& log); + +std::string encodeRemovePeer(const HostAddr& addr); + +HostAddr decodeRemovePeer(const folly::StringPiece& log); + class TestShard : public RaftPart { friend class SnapshotManagerImpl; public: @@ -91,6 +100,18 @@ class TestShard : public RaftPart { LOG(INFO) << idStr_ << "Preprocess transleader " << nLeader; break; } + case CommandType::ADD_PEER: { + auto peer = decodeAddPeer(log); + addPeer(peer); + LOG(INFO) << idStr_ << "Add peer " << peer; + break; + } + case CommandType::REMOVE_PEER: { + auto peer = decodeRemovePeer(log); + preProcessRemovePeer(peer); + LOG(INFO) << idStr_ << "Remove peer " << peer; + break; + } default: { break; } @@ -112,7 +133,6 @@ class TestShard : public RaftPart { public: int32_t commitTimes_ = 0; int32_t currLogId_ = -1; - bool isRunning_ = false; private: const size_t idx_; diff --git a/src/kvstore/test/LogEncoderTest.cpp b/src/kvstore/test/LogEncoderTest.cpp index ff36f68ddd0..cec2241a71f 100644 --- a/src/kvstore/test/LogEncoderTest.cpp +++ b/src/kvstore/test/LogEncoderTest.cpp @@ -121,6 +121,12 @@ TEST(LogEncoderTest, KVTest) { ASSERT_EQ("KV_val", decoded.second); } +TEST(LogEncoderTest, HostTest) { + auto encoded = encodeHost(OP_ADD_LEARNER, HostAddr(1, 1)); + auto decoded = decodeHost(OP_ADD_LEARNER, encoded); + ASSERT_EQ(HostAddr(1, 1), decoded); +} + } // namespace kvstore } // namespace nebula diff --git a/src/kvstore/wal/FileBasedWal.cpp b/src/kvstore/wal/FileBasedWal.cpp index f27b384560e..fce91b6bd7c 100644 --- a/src/kvstore/wal/FileBasedWal.cpp +++ b/src/kvstore/wal/FileBasedWal.cpp @@ -277,15 +277,15 @@ void FileBasedWal::closeCurrFile() { return; } - CHECK_EQ(fsync(currFd_), 0); + CHECK_EQ(fsync(currFd_), 0) << strerror(errno); // Close the file - CHECK_EQ(close(currFd_), 0); + CHECK_EQ(close(currFd_), 0) << strerror(errno); currFd_ = -1; auto now = time::WallClock::fastNowInSec(); currInfo_->setMTime(now); - DCHECK_EQ(currInfo_->size(), FileUtils::fileSize(currInfo_->path())) - << currInfo_->path() << " size does not match"; +// DCHECK_EQ(currInfo_->size(), FileUtils::fileSize(currInfo_->path())) +// << currInfo_->path() << " size does not match"; struct utimbuf timebuf; timebuf.modtime = currInfo_->mtime(); timebuf.actime = currInfo_->mtime(); @@ -443,7 +443,10 @@ bool FileBasedWal::appendLogInternal(LogID id, } ssize_t bytesWritten = write(currFd_, strBuf.data(), strBuf.size()); - CHECK_EQ(bytesWritten, strBuf.size()); + if (bytesWritten != (ssize_t)strBuf.size()) { + LOG(FATAL) << idStr_ << "bytesWritten:" << bytesWritten << ", expected:" << strBuf.size() + << ", error:" << strerror(errno); + } currInfo_->setSize(currInfo_->size() + strBuf.size()); currInfo_->setLastId(id); currInfo_->setLastTerm(term); diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index 251544fb770..771b407e5e9 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -54,5 +54,10 @@ std::vector ActiveHostsMan::getActiveHosts(kvstore::KVStore* kv, int32 return hosts; } +bool ActiveHostsMan::isLived(kvstore::KVStore* kv, const HostAddr& host) { + auto activeHosts = getActiveHosts(kv); + return std::find(activeHosts.begin(), activeHosts.end(), host) != activeHosts.end(); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/ActiveHostsMan.h b/src/meta/ActiveHostsMan.h index a26da07987e..dce7bbd91d8 100644 --- a/src/meta/ActiveHostsMan.h +++ b/src/meta/ActiveHostsMan.h @@ -53,6 +53,8 @@ class ActiveHostsMan final { static std::vector getActiveHosts(kvstore::KVStore* kv, int32_t expiredTTL = 0); + static bool isLived(kvstore::KVStore* kv, const HostAddr& host); + protected: ActiveHostsMan() = default; }; diff --git a/src/meta/client/MetaClient.cpp b/src/meta/client/MetaClient.cpp index e99f398fa02..af7971614a8 100644 --- a/src/meta/client/MetaClient.cpp +++ b/src/meta/client/MetaClient.cpp @@ -116,15 +116,15 @@ void MetaClient::loadDataThreadFunc() { addLoadDataTask(); } -void MetaClient::loadData() { +bool MetaClient::loadData() { if (ioThreadPool_->numThreads() <= 0) { LOG(ERROR) << "The threads number in ioThreadPool should be greater than 0"; - return; + return false; } auto ret = listSpaces().get(); if (!ret.ok()) { LOG(ERROR) << "List space failed, status:" << ret.status(); - return; + return false; } decltype(localCache_) cache; decltype(spaceIndexByName_) spaceIndexByName; @@ -141,7 +141,7 @@ void MetaClient::loadData() { if (!r.ok()) { LOG(ERROR) << "Get parts allocation failed for spaceId " << spaceId << ", status " << r.status(); - return; + return false; } auto spaceCache = std::make_shared(); @@ -161,7 +161,7 @@ void MetaClient::loadData() { spaceNewestTagVerMap, spaceNewestEdgeVerMap, spaceAllEdgeMap)) { - return; + return false; } cache.emplace(spaceId, spaceCache); @@ -183,6 +183,7 @@ void MetaClient::loadData() { diff(oldCache, localCache_); ready_ = true; LOG(INFO) << "Load data completed!"; + return true; } void MetaClient::addLoadDataTask() { @@ -390,7 +391,13 @@ Status MetaClient::handleResponse(const RESP& resp) { case cpp2::ErrorCode::E_WRONGCLUSTER: return Status::Error("wrong cluster!"); case cpp2::ErrorCode::E_LEADER_CHANGED: { - return Status::LeaderChanged(); + return Status::LeaderChanged("Leader changed!"); + case cpp2::ErrorCode::E_BALANCED: + return Status::Error("The cluster is balanced!"); + case cpp2::ErrorCode::E_BALANCER_RUNNING: + return Status::Error("The balancer is running!"); + case cpp2::ErrorCode::E_BAD_BALANCE_PLAN: + return Status::Error("Bad balance plan!"); } default: return Status::Error("Unknown code %d", static_cast(resp.get_code())); @@ -1168,6 +1175,20 @@ folly::Future> MetaClient::balance() { return future; } +folly::Future>> +MetaClient::showBalance(int64_t balanceId) { + cpp2::BalanceReq req; + req.set_id(balanceId); + folly::Promise>> promise; + auto future = promise.getFuture(); + getResponse(std::move(req), [] (auto client, auto request) { + return client->future_balance(request); + }, [] (cpp2::BalanceResp&& resp) -> std::vector { + return resp.tasks; + }, std::move(promise), true); + return future; +} + folly::Future> MetaClient::balanceLeader() { cpp2::LeaderBalanceReq req; folly::Promise> promise; @@ -1363,5 +1384,10 @@ ConfigItem MetaClient::toConfigItem(const cpp2::ConfigItem& item) { return ConfigItem(item.get_module(), item.get_name(), item.get_type(), item.get_mode(), value); } +Status MetaClient::refreshCache() { + auto ret = bgThread_->addTask(&MetaClient::loadData, this).get(); + return ret ? Status::OK() : Status::Error("Load data failed"); +} + } // namespace meta } // namespace nebula diff --git a/src/meta/client/MetaClient.h b/src/meta/client/MetaClient.h index 39695c80f7b..bab3b4ec529 100644 --- a/src/meta/client/MetaClient.h +++ b/src/meta/client/MetaClient.h @@ -211,6 +211,9 @@ class MetaClient { folly::Future> balance(); + folly::Future>> + showBalance(int64_t balanceId); + folly::Future> balanceLeader(); // Operations for config @@ -265,9 +268,13 @@ class MetaClient { const std::vector& getAddresses(); + Status refreshCache(); + protected: void loadDataThreadFunc(); - void loadData(); + // Return true if load succeeded. + bool loadData(); + void addLoadDataTask(); void heartBeatThreadFunc(); diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index a45e106d876..3f8cf88502a 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -8,8 +8,9 @@ #include "meta/MetaServiceUtils.h" #include "meta/processors/Common.h" #include "meta/ActiveHostsMan.h" +#include "kvstore/Part.h" -DEFINE_int32(max_retry_times_admin_op, 3, "max retry times for admin request!"); +DEFINE_int32(max_retry_times_admin_op, 30, "max retry times for admin request!"); namespace nebula { namespace meta { @@ -24,7 +25,21 @@ folly::Future AdminClient::transLeader(GraphSpaceID spaceId, storage::cpp2::TransLeaderReq req; req.set_space_id(spaceId); req.set_part_id(partId); - req.set_new_leader(to(dst)); + auto target = dst; + if (dst == kRandomPeer) { + auto ret = getPeers(spaceId, partId); + if (!ret.ok()) { + return ret.status(); + } + auto& peers = ret.value(); + for (auto& p : peers) { + if (p != leader && ActiveHostsMan::isLived(kv_, p)) { + target = p; + break; + } + } + } + req.set_new_leader(toThriftHost(target)); return getResponse(leader, std::move(req), [] (auto client, auto request) { return client->future_transLeader(request); }, [] (auto&& resp) -> Status { @@ -63,13 +78,16 @@ folly::Future AdminClient::addPart(GraphSpaceID spaceId, }); } -folly::Future AdminClient::addLearner(GraphSpaceID spaceId, PartitionID partId) { +folly::Future AdminClient::addLearner(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& learner) { if (injector_) { return injector_->addLearner(); } storage::cpp2::AddLearnerReq req; req.set_space_id(spaceId); req.set_part_id(partId); + req.set_learner(toThriftHost(learner)); auto ret = getPeers(spaceId, partId); if (!ret.ok()) { return ret.status(); @@ -83,13 +101,15 @@ folly::Future AdminClient::addLearner(GraphSpaceID spaceId, PartitionID } folly::Future AdminClient::waitingForCatchUpData(GraphSpaceID spaceId, - PartitionID partId) { + PartitionID partId, + const HostAddr& target) { if (injector_) { return injector_->waitingForCatchUpData(); } storage::cpp2::CatchUpDataReq req; req.set_space_id(spaceId); req.set_part_id(partId); + req.set_target(toThriftHost(target)); auto ret = getPeers(spaceId, partId); if (!ret.ok()) { return ret.status(); @@ -98,17 +118,22 @@ folly::Future AdminClient::waitingForCatchUpData(GraphSpaceID spaceId, auto f = pro.getFuture(); getResponse(ret.value(), 0, std::move(req), [] (auto client, auto request) { return client->future_waitingForCatchUpData(request); - }, 0, std::move(pro), FLAGS_max_retry_times_admin_op); + }, 0, std::move(pro), 3); return f; } -folly::Future AdminClient::memberChange(GraphSpaceID spaceId, PartitionID partId) { +folly::Future AdminClient::memberChange(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& peer, + bool added) { if (injector_) { return injector_->memberChange(); } storage::cpp2::MemberChangeReq req; req.set_space_id(spaceId); req.set_part_id(partId); + req.set_add(added); + req.set_peer(toThriftHost(peer)); auto ret = getPeers(spaceId, partId); if (!ret.ok()) { return ret.status(); @@ -134,29 +159,59 @@ folly::Future AdminClient::updateMeta(GraphSpaceID spaceId, return ret.status(); } auto peers = std::move(ret).value(); + auto strHosts = [] (const std::vector& hosts) -> std::string { + std::stringstream peersStr; + for (auto& h : hosts) { + peersStr << h << ","; + } + return peersStr.str(); + }; + LOG(INFO) << "[space:" << spaceId << ", part:" << partId << "] Update original peers " + << strHosts(peers) << ", remove " << src << ", add " << dst; auto it = std::find(peers.begin(), peers.end(), src); - CHECK(it != peers.end()); + if (it == peers.end()) { + LOG(INFO) << "src " << src << " has been removed in [" << spaceId << ", " << partId << "]"; + // In this case, the dst should be existed in peers. + if (std::find(peers.begin(), peers.end(), dst) == peers.end()) { + LOG(ERROR) << "[space:" << spaceId << ", part:" << partId << "] dst " + << dst << "should be existed in peers!"; + return Status::Error("dst not exist in peers"); + } + return Status::OK(); + } peers.erase(it); + + if (std::find(peers.begin(), peers.end(), dst) != peers.end()) { + LOG(ERROR) << "[space:" << spaceId << ", part:" << partId << "] dst " + << dst << " has been existed!"; + return Status::Error("dst has been existed in peers"); + } peers.emplace_back(dst); std::vector thriftPeers; thriftPeers.resize(peers.size()); std::transform(peers.begin(), peers.end(), thriftPeers.begin(), [this](const auto& h) { - return to(h); + return toThriftHost(h); }); + + auto partRet = kv_->part(kDefaultSpaceId, kDefaultPartId); + CHECK(ok(partRet)); + auto part = nebula::value(partRet); + folly::Promise pro; auto f = pro.getFuture(); std::vector data; data.emplace_back(MetaServiceUtils::partKey(spaceId, partId), MetaServiceUtils::partVal(thriftPeers)); - kv_->asyncMultiPut(kDefaultSpaceId, - kDefaultPartId, - std::move(data), - [p = std::move(pro)] (kvstore::ResultCode code) mutable { - if (code == kvstore::ResultCode::SUCCEEDED) { - p.setValue(Status::OK()); - } else { - p.setValue(Status::Error("Access kv failed, code:%d", static_cast(code))); - } + part->asyncMultiPut(std::move(data), [] (kvstore::ResultCode) {}); + part->sync([this, p = std::move(pro)] (kvstore::ResultCode code) mutable { + // To avoid dead lock, we call future callback in ioThreadPool_ + folly::via(ioThreadPool_.get(), [code, p = std::move(p)] () mutable { + if (code == kvstore::ResultCode::SUCCEEDED) { + p.setValue(Status::OK()); + } else { + p.setValue(Status::Error("Access kv failed, code:%d", static_cast(code))); + } + }); }); return f; } @@ -240,7 +295,7 @@ void AdminClient::getResponse( << ", retry " << retry << ", limit " << retryLimit; getResponse(std::move(hosts), - index + 1, + (index + 1) % hosts.size(), std::move(req), remoteFunc, retry + 1, @@ -261,6 +316,20 @@ void AdminClient::getResponse( case storage::cpp2::ErrorCode::E_LEADER_CHANGED: { if (retry < retryLimit) { HostAddr leader(resp.get_leader().get_ip(), resp.get_leader().get_port()); + if (leader == HostAddr(0, 0)) { + usleep(1000 * 50); + LOG(INFO) << "The leader is in election" + << ", retry " << retry + << ", limit " << retryLimit; + getResponse(std::move(hosts), + (index + 1) % hosts.size(), + std::move(req), + std::move(remoteFunc), + retry + 1, + std::move(p), + retryLimit); + return; + } int32_t leaderIndex = 0; for (auto& h : hosts) { if (h == leader) { @@ -268,10 +337,19 @@ void AdminClient::getResponse( } leaderIndex++; } - LOG(INFO) << "Return leder change from " << hosts[index] + if (leaderIndex == (int32_t)hosts.size()) { + LOG(ERROR) << "The new leader is " << leader; + for (auto& h : hosts) { + LOG(ERROR) << "The peer is " << h; + } + p.setValue(Status::Error("Can't find leader in current peers")); + return; + } + LOG(INFO) << "Return leader change from " << hosts[index] << ", new leader is " << leader << ", retry " << retry << ", limit " << retryLimit; + CHECK_LT(leaderIndex, hosts.size()); getResponse(std::move(hosts), leaderIndex, std::move(req), @@ -291,7 +369,7 @@ void AdminClient::getResponse( << ", retry " << retry << ", limit " << retryLimit; getResponse(std::move(hosts), - index + 1, + (index + 1) % hosts.size(), std::move(req), std::move(remoteFunc), retry + 1, @@ -308,7 +386,7 @@ void AdminClient::getResponse( }); // via } -nebula::cpp2::HostAddr AdminClient::to(const HostAddr& addr) { +nebula::cpp2::HostAddr AdminClient::toThriftHost(const HostAddr& addr) { nebula::cpp2::HostAddr thriftAddr; thriftAddr.set_ip(addr.first); thriftAddr.set_port(addr.second); @@ -364,7 +442,8 @@ folly::Future AdminClient::getLeaderDist(HostLeaderMap* result) { return; } auto&& resp = std::move(t).value(); - (*result)[host] = std::move(resp.get_leader_parts()); + LOG(INFO) << "Get leader for host " << host; + result->emplace(host, std::move(resp).get_leader_parts()); p.setValue(Status::OK()); }); }); diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h index 7eca7d0de50..26fd855a979 100644 --- a/src/meta/processors/admin/AdminClient.h +++ b/src/meta/processors/admin/AdminClient.h @@ -64,11 +64,22 @@ class AdminClient { const HostAddr& host, bool asLearner); - folly::Future addLearner(GraphSpaceID spaceId, PartitionID partId); + folly::Future addLearner(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& learner); - folly::Future waitingForCatchUpData(GraphSpaceID spaceId, PartitionID partId); + folly::Future waitingForCatchUpData(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& target); - folly::Future memberChange(GraphSpaceID spaceId, PartitionID partId); + /** + * Add/Remove one peer for raft group (spaceId, partId). + * "added" should be true if we want to add one peer, otherwise it is false. + * */ + folly::Future memberChange(GraphSpaceID spaceId, + PartitionID partId, + const HostAddr& peer, + bool added); folly::Future updateMeta(GraphSpaceID spaceId, PartitionID partId, @@ -105,7 +116,7 @@ class AdminClient { Status handleResponse(const storage::cpp2::AdminExecResp& resp); - nebula::cpp2::HostAddr to(const HostAddr& addr); + nebula::cpp2::HostAddr toThriftHost(const HostAddr& addr); StatusOr> getPeers(GraphSpaceID spaceId, PartitionID partId); diff --git a/src/meta/processors/admin/BalancePlan.cpp b/src/meta/processors/admin/BalancePlan.cpp index aa828209835..db6e789f5bd 100644 --- a/src/meta/processors/admin/BalancePlan.cpp +++ b/src/meta/processors/admin/BalancePlan.cpp @@ -7,6 +7,7 @@ #include "meta/processors/admin/BalancePlan.h" #include #include "meta/processors/Common.h" +#include "meta/ActiveHostsMan.h" DEFINE_uint32(task_concurrency, 10, "The tasks number could be invoked simultaneously"); @@ -54,6 +55,7 @@ void BalancePlan::invoke() { finished = true; if (status_ == Status::IN_PROGRESS) { status_ = Status::SUCCEEDED; + LOG(INFO) << "Balance " << id_ << " succeeded!"; } } } @@ -76,6 +78,7 @@ void BalancePlan::invoke() { status_ = Status::FAILED; if (finishedTaskNum_ == tasks_.size()) { finished = true; + LOG(INFO) << "Balance " << id_ << " failed!"; } } if (finished) { @@ -128,7 +131,7 @@ bool BalancePlan::saveInStore(bool onlyPlan) { return true; } -bool BalancePlan::recovery() { +bool BalancePlan::recovery(bool resume) { if (kv_) { const auto& prefix = BalanceTask::prefix(id_); std::unique_ptr iter; @@ -148,17 +151,28 @@ bool BalancePlan::recovery() { task.partId_ = std::get<2>(tup); task.src_ = std::get<3>(tup); task.dst_ = std::get<4>(tup); + task.taskIdStr_ = task.buildTaskId(); } { auto tup = BalanceTask::parseVal(iter->val()); task.status_ = std::get<0>(tup); task.ret_ = std::get<1>(tup); - if (task.ret_ == BalanceTask::Result::FAILED) { + task.srcLived_ = std::get<2>(tup); + task.startTimeMs_ = std::get<3>(tup); + task.endTimeMs_ = std::get<4>(tup); + if (resume && task.ret_ != BalanceTask::Result::SUCCEEDED) { // Resume the failed task. task.ret_ = BalanceTask::Result::IN_PROGRESS; + task.status_ = BalanceTask::Status::START; + if (ActiveHostsMan::isLived(kv_, task.src_)) { + task.srcLived_ = true; + } else { + task.srcLived_ = false; + } + if (!ActiveHostsMan::isLived(kv_, task.dst_)) { + task.ret_ = BalanceTask::Result::INVALID; + } } - task.startTimeMs_ = std::get<2>(tup); - task.endTimeMs_ = std::get<3>(tup); } tasks_.emplace_back(std::move(task)); iter->next(); diff --git a/src/meta/processors/admin/BalancePlan.h b/src/meta/processors/admin/BalancePlan.h index f52b031c960..cc54b4a273a 100644 --- a/src/meta/processors/admin/BalancePlan.h +++ b/src/meta/processors/admin/BalancePlan.h @@ -39,6 +39,14 @@ class BalancePlan { , kv_(kv) , client_(client) {} + BalancePlan(const BalancePlan& plan) + : id_(plan.id_) + , kv_(plan.kv_) + , client_(plan.client_) + , tasks_(plan.tasks_) + , finishedTaskNum_(plan.finishedTaskNum_) + , status_(plan.status_) {} + void addTask(BalanceTask task) { tasks_.emplace_back(std::move(task)); } @@ -53,14 +61,22 @@ class BalancePlan { * */ void rollback() {} + Status status() { + return status_; + } + bool saveInStore(bool onlyPlan = false); BalanceID id() const { return id_; } + const std::vector& tasks() const { + return tasks_; + } + private: - bool recovery(); + bool recovery(bool resume = true); std::string planKey() const; diff --git a/src/meta/processors/admin/BalanceProcessor.cpp b/src/meta/processors/admin/BalanceProcessor.cpp index 3f4552e7fc1..5eff4cf5ec4 100644 --- a/src/meta/processors/admin/BalanceProcessor.cpp +++ b/src/meta/processors/admin/BalanceProcessor.cpp @@ -19,8 +19,35 @@ void BalanceProcessor::process(const cpp2::BalanceReq& req) { return; } if (req.get_id() != nullptr) { - LOG(ERROR) << "Unsupport show status for specific balance plan, id=" << *req.get_id(); - resp_.set_code(cpp2::ErrorCode::E_UNSUPPORTED); + auto ret = Balancer::instance(kvstore_)->show(*req.get_id()); + if (!ret.ok()) { + resp_.set_code(cpp2::ErrorCode::E_BAD_BALANCE_PLAN); + onFinished(); + return; + } + resp_.set_code(cpp2::ErrorCode::SUCCEEDED); + const auto& plan = ret.value(); + std::vector thriftTasks; + for (auto& task : plan.tasks()) { + cpp2::BalanceTask t; + t.set_id(task.taskIdStr()); + switch (task.result()) { + case BalanceTask::Result::SUCCEEDED: + t.set_result(cpp2::TaskResult::SUCCEEDED); + break; + case BalanceTask::Result::FAILED: + t.set_result(cpp2::TaskResult::FAILED); + break; + case BalanceTask::Result::IN_PROGRESS: + t.set_result(cpp2::TaskResult::IN_PROGRESS); + break; + case BalanceTask::Result::INVALID: + t.set_result(cpp2::TaskResult::INVALID); + break; + } + thriftTasks.emplace_back(std::move(t)); + } + resp_.set_tasks(std::move(thriftTasks)); onFinished(); return; } @@ -33,8 +60,11 @@ void BalanceProcessor::process(const cpp2::BalanceReq& req) { } auto ret = Balancer::instance(kvstore_)->balance(); if (!ret.ok()) { - LOG(INFO) << "The balancer is running."; - resp_.set_code(cpp2::ErrorCode::E_BALANCER_RUNNING); + if (ret.status() == Status::Balanced()) { + resp_.set_code(cpp2::ErrorCode::E_BALANCED); + } else { + resp_.set_code(cpp2::ErrorCode::E_BALANCER_RUNNING); + } onFinished(); return; } diff --git a/src/meta/processors/admin/BalanceTask.cpp b/src/meta/processors/admin/BalanceTask.cpp index ec593396fcf..3abfe1dcbc5 100644 --- a/src/meta/processors/admin/BalanceTask.cpp +++ b/src/meta/processors/admin/BalanceTask.cpp @@ -22,9 +22,17 @@ const std::string kBalanceTaskTable = "__b_task__"; // NOLINT void BalanceTask::invoke() { CHECK_NOTNULL(client_); + if (ret_ == Result::INVALID) { + endTimeMs_ = time::WallClock::fastNowInMilliSec(); + saveInStore(); + LOG(ERROR) << taskIdStr_ << "Task invalid!"; + onFinished_(); + return; + } if (ret_ == Result::FAILED) { endTimeMs_ = time::WallClock::fastNowInMilliSec(); saveInStore(); + LOG(ERROR) << taskIdStr_ << "Task failed, status_ " << static_cast(status_); onError_(); return; } @@ -38,15 +46,20 @@ void BalanceTask::invoke() { case Status::CHANGE_LEADER: { LOG(INFO) << taskIdStr_ << "Ask the src to give up the leadership."; SAVE_STATE(); - client_->transLeader(spaceId_, partId_, src_).thenValue([this](auto&& resp) { - if (!resp.ok()) { - ret_ = Result::FAILED; - } else { - status_ = Status::ADD_PART_ON_DST; - } - invoke(); - }); - break; + if (srcLived_) { + client_->transLeader(spaceId_, partId_, src_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + status_ = Status::ADD_PART_ON_DST; + } + invoke(); + }); + break; + } else { + LOG(INFO) << taskIdStr_ << "Src host has been lost, so no need to transfer leader"; + status_ = Status::ADD_PART_ON_DST; + } } case Status::ADD_PART_ON_DST: { LOG(INFO) << taskIdStr_ << "Open the part as learner on dst."; @@ -64,7 +77,7 @@ void BalanceTask::invoke() { case Status::ADD_LEARNER: { LOG(INFO) << taskIdStr_ << "Add learner dst."; SAVE_STATE(); - client_->addLearner(spaceId_, partId_).thenValue([this](auto&& resp) { + client_->addLearner(spaceId_, partId_, dst_).thenValue([this](auto&& resp) { if (!resp.ok()) { ret_ = Result::FAILED; } else { @@ -77,58 +90,82 @@ void BalanceTask::invoke() { case Status::CATCH_UP_DATA: { LOG(INFO) << taskIdStr_ << "Waiting for the data catch up."; SAVE_STATE(); - client_->waitingForCatchUpData(spaceId_, partId_).thenValue([this](auto&& resp) { + client_->waitingForCatchUpData(spaceId_, partId_, dst_).thenValue([this](auto&& resp) { if (!resp.ok()) { ret_ = Result::FAILED; } else { - status_ = Status::MEMBER_CHANGE; + status_ = Status::MEMBER_CHANGE_ADD; } invoke(); }); break; } - case Status::MEMBER_CHANGE: { + case Status::MEMBER_CHANGE_ADD: { LOG(INFO) << taskIdStr_ << "Send member change request to the leader" - << ", it will add the new member on dst host" - << " and remove the old member on src host."; + << ", it will add the new member on dst host"; SAVE_STATE(); - client_->memberChange(spaceId_, partId_).thenValue([this](auto&& resp) { + client_->memberChange(spaceId_, partId_, dst_, true).thenValue([this](auto&& resp) { if (!resp.ok()) { ret_ = Result::FAILED; } else { - status_ = Status::UPDATE_PART_META; + status_ = Status::MEMBER_CHANGE_REMOVE; } invoke(); }); break; } - case Status::UPDATE_PART_META: { - LOG(INFO) << taskIdStr_ << "Update meta for part."; + case Status::MEMBER_CHANGE_REMOVE: { + LOG(INFO) << taskIdStr_ << "Send member change request to the leader" + << ", it will remove the old member on src host"; SAVE_STATE(); - client_->updateMeta(spaceId_, partId_, src_, dst_).thenValue([this](auto&& resp) { + client_->memberChange(spaceId_, partId_, src_, false).thenValue( + [this] (auto&& resp) { if (!resp.ok()) { ret_ = Result::FAILED; } else { - status_ = Status::REMOVE_PART_ON_SRC; + status_ = Status::UPDATE_PART_META; } invoke(); }); break; } - case Status::REMOVE_PART_ON_SRC: { - LOG(INFO) << taskIdStr_ << "Close part on src host."; + case Status::UPDATE_PART_META: { + LOG(INFO) << taskIdStr_ << "Update meta for part."; SAVE_STATE(); - client_->removePart(spaceId_, partId_, src_).thenValue([this](auto&& resp) { + client_->updateMeta(spaceId_, partId_, src_, dst_).thenValue( + [this] (auto&& resp) { + // The callback will be called inside raft set value. So don't call invoke directly + // here. + LOG(INFO) << "Update meta succeeded!"; if (!resp.ok()) { ret_ = Result::FAILED; } else { - ret_ = Result::SUCCEEDED; - status_ = Status::END; + status_ = Status::REMOVE_PART_ON_SRC; } invoke(); }); break; } + case Status::REMOVE_PART_ON_SRC: { + LOG(INFO) << taskIdStr_ << "Close part on src host, srcLived " << srcLived_; + SAVE_STATE(); + if (srcLived_) { + client_->removePart(spaceId_, partId_, src_).thenValue([this](auto&& resp) { + if (!resp.ok()) { + ret_ = Result::FAILED; + } else { + ret_ = Result::SUCCEEDED; + status_ = Status::END; + } + invoke(); + }); + break; + } else { + LOG(INFO) << taskIdStr_ << "Don't remove part on src " << src_; + ret_ = Result::SUCCEEDED; + status_ = Status::END; + } + } case Status::END: { LOG(INFO) << taskIdStr_ << "Part has been moved successfully!"; endTimeMs_ = time::WallClock::fastNowInSec(); @@ -188,6 +225,7 @@ std::string BalanceTask::taskVal() { str.reserve(32); str.append(reinterpret_cast(&status_), sizeof(status_)); str.append(reinterpret_cast(&ret_), sizeof(ret_)); + str.append(reinterpret_cast(&srcLived_), sizeof(srcLived_)); str.append(reinterpret_cast(&startTimeMs_), sizeof(startTimeMs_)); str.append(reinterpret_cast(&endTimeMs_), sizeof(endTimeMs_)); return str; @@ -216,17 +254,19 @@ BalanceTask::parseKey(const folly::StringPiece& rawKey) { return std::make_tuple(balanceId, spaceId, partId, src, dst); } -std::tuple +std::tuple BalanceTask::parseVal(const folly::StringPiece& rawVal) { int32_t offset = 0; auto status = *reinterpret_cast(rawVal.begin() + offset); offset += sizeof(BalanceTask::Status); auto ret = *reinterpret_cast(rawVal.begin() + offset); offset += sizeof(BalanceTask::Result); + auto srcLived = *reinterpret_cast(rawVal.begin() + offset); + offset += sizeof(bool); auto start = *reinterpret_cast(rawVal.begin() + offset); offset += sizeof(int64_t); auto end = *reinterpret_cast(rawVal.begin() + offset); - return std::make_tuple(status, ret, start, end); + return std::make_tuple(status, ret, srcLived, start, end); } } // namespace meta diff --git a/src/meta/processors/admin/BalanceTask.h b/src/meta/processors/admin/BalanceTask.h index e1849979cd4..692fc5a7cb7 100644 --- a/src/meta/processors/admin/BalanceTask.h +++ b/src/meta/processors/admin/BalanceTask.h @@ -26,12 +26,33 @@ class BalanceTask { FRIEND_TEST(BalanceTest, RecoveryTest); public: + enum class Status : uint8_t { + START = 0x01, + CHANGE_LEADER = 0x02, + ADD_PART_ON_DST = 0x03, + ADD_LEARNER = 0x04, + CATCH_UP_DATA = 0x05, + MEMBER_CHANGE_ADD = 0x06, + MEMBER_CHANGE_REMOVE = 0x07, + UPDATE_PART_META = 0x08, // After this state, we can't rollback anymore. + REMOVE_PART_ON_SRC = 0x09, + END = 0xFF, + }; + + enum class Result : uint8_t { + SUCCEEDED = 0x01, + FAILED = 0x02, + IN_PROGRESS = 0x03, + INVALID = 0x04, + }; + BalanceTask() = default; BalanceTask(BalanceID balanceId, GraphSpaceID spaceId, PartitionID partId, const HostAddr& src, const HostAddr& dst, + bool srcLived, kvstore::KVStore* kv, AdminClient* client) : balanceId_(balanceId) @@ -39,14 +60,8 @@ class BalanceTask { , partId_(partId) , src_(src) , dst_(dst) - , taskIdStr_(folly::stringPrintf( - "[%ld, %d, %s:%d->%s:%d] ", - balanceId, - partId, - network::NetworkUtils::intToIPv4(src.first).c_str(), - src.second, - network::NetworkUtils::intToIPv4(dst.first).c_str(), - dst.second)) + , srcLived_(srcLived) + , taskIdStr_(buildTaskId()) , kv_(kv) , client_(client) {} @@ -58,24 +73,21 @@ class BalanceTask { void rollback(); -private: - enum class Status : uint8_t { - START = 0x01, - CHANGE_LEADER = 0x02, - ADD_PART_ON_DST = 0x03, - ADD_LEARNER = 0x04, - CATCH_UP_DATA = 0x05, - MEMBER_CHANGE = 0x06, - UPDATE_PART_META = 0x07, // After this state, we can't rollback anymore. - REMOVE_PART_ON_SRC = 0x08, - END = 0xFF, - }; + Result result() const { + return ret_; + } - enum class Result : uint8_t { - SUCCEEDED = 0x01, - FAILED = 0x02, - IN_PROGRESS = 0x03, - }; +private: + std::string buildTaskId() { + return folly::stringPrintf("[%ld, %d:%d, %s:%d->%s:%d] ", + balanceId_, + spaceId_, + partId_, + network::NetworkUtils::intToIPv4(src_.first).c_str(), + src_.second, + network::NetworkUtils::intToIPv4(dst_.first).c_str(), + dst_.second); + } bool saveInStore(); @@ -88,7 +100,7 @@ class BalanceTask { static std::tuple parseKey(const folly::StringPiece& rawKey); - static std::tuple + static std::tuple parseVal(const folly::StringPiece& rawVal); private: @@ -97,6 +109,7 @@ class BalanceTask { PartitionID partId_; HostAddr src_; HostAddr dst_; + bool srcLived_ = true; // false means the src host have been lost. std::string taskIdStr_; kvstore::KVStore* kv_ = nullptr; AdminClient* client_ = nullptr; diff --git a/src/meta/processors/admin/Balancer.cpp b/src/meta/processors/admin/Balancer.cpp index 924087d7e8c..835221325b3 100644 --- a/src/meta/processors/admin/Balancer.cpp +++ b/src/meta/processors/admin/Balancer.cpp @@ -29,7 +29,7 @@ StatusOr Balancer::balance() { LOG(INFO) << "There is no corrupted plan need to recovery, so create a new one"; auto status = buildBalancePlan(); if (plan_ == nullptr) { - LOG(ERROR) << "Create balance plan failed!"; + LOG(ERROR) << "Create balance plan failed, status " << status.toString(); return status; } } @@ -39,6 +39,17 @@ StatusOr Balancer::balance() { return Status::Error("balance running"); } +StatusOr Balancer::show(BalanceID id) const { + if (kv_) { + BalancePlan plan(id, kv_, client_.get()); + if (!plan.recovery(false)) { + return Status::Error("Get balance plan failed, id %ld", id); + } + return plan; + } + return Status::Error("KV is nullptr"); +} + bool Balancer::recovery() { CHECK(!plan_) << "plan should be nullptr now"; if (kv_) { @@ -121,7 +132,7 @@ Status Balancer::buildBalancePlan() { }; if (plan_->tasks_.empty()) { plan_->onFinished_(); - return Status::Error("No Tasks"); + return Status::Balanced(); } if (!plan_->saveInStore()) { plan_->onFinished_(); @@ -145,9 +156,11 @@ std::vector Balancer::genTasks(GraphSpaceID spaceId) { calDiff(hostParts, activeHosts, newlyAdded, lost); decltype(hostParts) newHostParts(hostParts); for (auto& h : newlyAdded) { + LOG(INFO) << "Found new host " << h; newHostParts.emplace(h, std::vector()); } for (auto& h : lost) { + LOG(INFO) << "Lost host " << h; newHostParts.erase(h); } LOG(INFO) << "Now, try to balance the newHostParts"; @@ -157,6 +170,11 @@ std::vector Balancer::genTasks(GraphSpaceID spaceId) { for (auto& h : lost) { auto& lostParts = hostParts[h]; for (auto& partId : lostParts) { + auto srcRet = hostWithPart(newHostParts, partId); + if (!srcRet.ok()) { + LOG(ERROR) << "Error:" << srcRet.status(); + return std::vector(); + } auto ret = hostWithMinimalParts(newHostParts, partId); if (!ret.ok()) { LOG(ERROR) << "Error:" << ret.status(); @@ -169,6 +187,7 @@ std::vector Balancer::genTasks(GraphSpaceID spaceId) { partId, h, luckyHost, + false, kv_, client_.get()); } @@ -202,6 +221,8 @@ void Balancer::balanceParts(BalanceID balanceId, CHECK_GE(avgLoad, minPartsHost.second); auto& partsFrom = newHostParts[maxPartsHost.first]; auto& partsTo = newHostParts[minPartsHost.first]; + std::sort(partsFrom.begin(), partsFrom.end()); + std::sort(partsTo.begin(), partsTo.end()); VLOG(1) << maxPartsHost.first << ":" << partsFrom.size() << "->" << minPartsHost.first << ":" << partsTo.size() << ", lastDelta=" << lastDelta; @@ -217,9 +238,11 @@ void Balancer::balanceParts(BalanceID balanceId, << maxPartsHost.first << " to " << minPartsHost.first; break; } - VLOG(1) << maxPartsHost.first << "->" << minPartsHost.first << ": " << partId; + LOG(INFO) << "[space:" << spaceId << ", part:" << partId << "] " + << maxPartsHost.first << "->" << minPartsHost.first; auto it = std::find(partsFrom.begin(), partsFrom.end(), partId); CHECK(it != partsFrom.end()); + CHECK(std::find(partsTo.begin(), partsTo.end(), partId) == partsTo.end()); partsFrom.erase(it); partsTo.emplace_back(partId); tasks.emplace_back(balanceId, @@ -227,6 +250,7 @@ void Balancer::balanceParts(BalanceID balanceId, partId, maxPartsHost.first, minPartsHost.first, + true, kv_, client_.get()); noAction = false; @@ -285,13 +309,13 @@ void Balancer::calDiff(const std::unordered_map& newlyAdded, std::vector& lost) { for (auto it = hostParts.begin(); it != hostParts.end(); it++) { - VLOG(3) << "Host " << it->first << ", parts " << it->second.size(); + VLOG(1) << "Original Host " << it->first << ", parts " << it->second.size(); if (std::find(activeHosts.begin(), activeHosts.end(), it->first) == activeHosts.end()) { lost.emplace_back(it->first); } } for (auto& h : activeHosts) { - VLOG(3) << "Actvie Host " << h; + VLOG(1) << "Active host " << h; if (hostParts.find(h) == hostParts.end()) { newlyAdded.emplace_back(h); } @@ -311,6 +335,17 @@ Balancer::sortedHostsByParts(const std::unordered_map Balancer::hostWithPart( + const std::unordered_map>& hostParts, + PartitionID partId) { + for (auto it = hostParts.begin(); it != hostParts.end(); it++) { + if (std::find(it->second.begin(), it->second.end(), partId) != it->second.end()) { + return it->first; + } + } + return Status::Error("No host hold the part %d", partId); +} + StatusOr Balancer::hostWithMinimalParts( const std::unordered_map>& hostParts, PartitionID partId) { diff --git a/src/meta/processors/admin/Balancer.h b/src/meta/processors/admin/Balancer.h index a633db1e2e1..2289d2f5448 100644 --- a/src/meta/processors/admin/Balancer.h +++ b/src/meta/processors/admin/Balancer.h @@ -50,6 +50,7 @@ class Balancer { FRIEND_TEST(BalanceTest, LeaderBalanceTest); FRIEND_TEST(BalanceTest, ManyHostsLeaderBalancePlanTest); FRIEND_TEST(BalanceIntegrationTest, LeaderBalanceTest); + FRIEND_TEST(BalanceIntegrationTest, BalanceTest); public: static Balancer* instance(kvstore::KVStore* kv) { @@ -66,19 +67,17 @@ class Balancer { StatusOr balance(); /** - * TODO(heng): Rollback some specific balance id + * Show balance plan id status. + * */ + StatusOr show(BalanceID id) const; + + /** + * TODO(heng): rollback some balance plan. */ Status rollback(BalanceID id) { return Status::Error("unplemented, %ld", id); } - /** - * TODO(heng): Only generate balance plan for our users. - * */ - const BalancePlan* preview() { - return plan_.get(); - } - /** * TODO(heng): Execute balance plan from outside. * */ @@ -97,6 +96,10 @@ class Balancer { cpp2::ErrorCode leaderBalance(); + bool isRunning() { + return running_; + } + private: Balancer(kvstore::KVStore* kv, std::unique_ptr client) : kv_(kv) @@ -124,6 +127,10 @@ class Balancer { std::vector& newlyAdded, std::vector& lost); + StatusOr hostWithPart( + const std::unordered_map>& hostParts, + PartitionID partId); + StatusOr hostWithMinimalParts( const std::unordered_map>& hostParts, PartitionID partId); diff --git a/src/meta/test/AdminClientTest.cpp b/src/meta/test/AdminClientTest.cpp index cf2b55b7b0d..8b0ad6da94f 100644 --- a/src/meta/test/AdminClientTest.cpp +++ b/src/meta/test/AdminClientTest.cpp @@ -115,8 +115,8 @@ TEST(AdminClientTest, SimpleTest) { { LOG(INFO) << "Test transLeader..."; folly::Baton baton; - client->transLeader(0, 0, {localIp, sc->port_}).then([&baton](auto&& st) { - CHECK(st.ok()); + client->transLeader(0, 0, {localIp, sc->port_}, HostAddr(1, 1)).then([&baton](auto&& st) { + CHECK(st.ok()) << st.toString(); baton.post(); }); baton.wait(); @@ -188,7 +188,7 @@ TEST(AdminClientTest, RetryTest) { { LOG(INFO) << "Test transLeader, return ok if target is not leader"; folly::Baton baton; - client->transLeader(0, 1, {localIp, sc2->port_}).then([&baton](auto&& st) { + client->transLeader(0, 1, {localIp, sc2->port_}, HostAddr(1, 1)).then([&baton](auto&& st) { CHECK(st.ok()); baton.post(); }); @@ -197,7 +197,7 @@ TEST(AdminClientTest, RetryTest) { { LOG(INFO) << "Test member change..."; folly::Baton baton; - client->memberChange(0, 1).then([&baton](auto&& st) { + client->memberChange(0, 1, HostAddr(0, 0), true).then([&baton](auto&& st) { CHECK(st.ok()); baton.post(); }); @@ -206,7 +206,7 @@ TEST(AdminClientTest, RetryTest) { { LOG(INFO) << "Test add learner..."; folly::Baton baton; - client->addLearner(0, 1).then([&baton](auto&& st) { + client->addLearner(0, 1, HostAddr(0, 0)).then([&baton](auto&& st) { CHECK(st.ok()); baton.post(); }); @@ -215,7 +215,7 @@ TEST(AdminClientTest, RetryTest) { { LOG(INFO) << "Test waitingForCatchUpData..."; folly::Baton baton; - client->waitingForCatchUpData(0, 1).then([&baton](auto&& st) { + client->waitingForCatchUpData(0, 1, HostAddr(0, 0)).then([&baton](auto&& st) { CHECK(st.ok()); baton.post(); }); @@ -225,7 +225,7 @@ TEST(AdminClientTest, RetryTest) { { LOG(INFO) << "Test member change..."; folly::Baton baton; - client->memberChange(0, 1).then([&baton](auto&& st) { + client->memberChange(0, 1, HostAddr(0, 0), true).then([&baton](auto&& st) { CHECK(!st.ok()); CHECK_EQ("Leader changed!", st.toString()); baton.post(); diff --git a/src/meta/test/BalanceIntegrationTest.cpp b/src/meta/test/BalanceIntegrationTest.cpp index 48ab3ca3710..619fe347814 100644 --- a/src/meta/test/BalanceIntegrationTest.cpp +++ b/src/meta/test/BalanceIntegrationTest.cpp @@ -11,19 +11,237 @@ #include "meta/test/TestUtils.h" #include "storage/test/TestUtils.h" #include "fs/TempDir.h" +#include "storage/client/StorageClient.h" +#include "storage/test/TestUtils.h" +#include "dataman/RowWriter.h" DECLARE_int32(load_data_interval_secs); DECLARE_int32(heartbeat_interval_secs); DECLARE_uint32(raft_heartbeat_interval_secs); +DECLARE_int32(expired_threshold_sec); namespace nebula { namespace meta { -TEST(BalanceIntegrationTest, SimpleTest) { - auto sc = std::make_unique(); - auto handler = std::make_shared(nullptr, nullptr); - sc->mockCommon("storage", 0, handler); - LOG(INFO) << "Start storage server on " << sc->port_; +TEST(BalanceIntegrationTest, BalanceTest) { + FLAGS_load_data_interval_secs = 1; + FLAGS_heartbeat_interval_secs = 1; + FLAGS_raft_heartbeat_interval_secs = 1; + FLAGS_expired_threshold_sec = 3; + fs::TempDir rootPath("/tmp/balance_integration_test.XXXXXX"); + IPv4 localIp; + network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); + const nebula::ClusterID kClusterId = 10; + + uint32_t localMetaPort = network::NetworkUtils::getAvailablePort(); + LOG(INFO) << "Start meta server...."; + std::string metaPath = folly::stringPrintf("%s/meta", rootPath.path()); + auto metaServerContext = meta::TestUtils::mockMetaServer(localMetaPort, metaPath.c_str(), + kClusterId); + localMetaPort = metaServerContext->port_; + + auto adminClient = std::make_unique(metaServerContext->kvStore_.get()); + Balancer balancer(metaServerContext->kvStore_.get(), std::move(adminClient)); + + auto threadPool = std::make_shared(10); + std::vector metaAddr = {HostAddr(localIp, localMetaPort)}; + + LOG(INFO) << "Create meta client..."; + uint32_t tempDataPort = network::NetworkUtils::getAvailablePort(); + HostAddr tempDataAddr(localIp, tempDataPort); + auto mClient = std::make_unique(threadPool, metaAddr, tempDataAddr, + kClusterId, false); + + mClient->waitForMetadReady(); + + int partition = 1; + int replica = 3; + LOG(INFO) << "Start " << replica << " storage services, partition number " << partition; + std::vector peers; + std::vector storagePorts; + std::vector> metaClients; + std::vector> serverContexts; + for (int i = 0; i < replica; i++) { + uint32_t storagePort = network::NetworkUtils::getAvailablePort(); + HostAddr storageAddr(localIp, storagePort); + storagePorts.emplace_back(storagePort); + peers.emplace_back(storageAddr); + + VLOG(1) << "The storage server has been added to the meta service"; + + auto metaClient = std::make_shared(threadPool, metaAddr, storageAddr, + kClusterId, true); + metaClient->waitForMetadReady(); + metaClients.emplace_back(metaClient); + } + + for (int i = 0; i < replica; i++) { + std::string dataPath = folly::stringPrintf("%s/%d/data", rootPath.path(), i); + auto sc = storage::TestUtils::mockStorageServer(metaClients[i].get(), + dataPath.c_str(), + localIp, + storagePorts[i], + true); + serverContexts.emplace_back(std::move(sc)); + } + + LOG(INFO) << "Create space and schema"; + auto ret = mClient->createSpace("storage", partition, replica).get(); + ASSERT_TRUE(ret.ok()); + auto spaceId = ret.value(); + std::vector columns; + columns.emplace_back(FRAGILE, + "c", + nebula::cpp2::ValueType(FRAGILE, + SupportedType::STRING, + nullptr, + nullptr)); + nebula::cpp2::Schema schema; + schema.set_columns(std::move(columns)); + auto tagRet = mClient->createTagSchema(spaceId, "tag", std::move(schema)).get(); + ASSERT_TRUE(tagRet.ok()); + auto tagId = tagRet.value(); + sleep(FLAGS_load_data_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); + + LOG(INFO) << "Let's write some data"; + auto sClient = std::make_unique(threadPool, mClient.get()); + { + std::vector vertices; + for (int32_t vId = 0; vId < 10000; vId++) { + storage::cpp2::Vertex v; + v.set_id(vId); + decltype(v.tags) tags; + storage::cpp2::Tag t; + t.set_tag_id(tagId); + // Generate some tag props. + RowWriter writer; + writer << std::string(1024, 'A'); + t.set_props(writer.encode()); + tags.emplace_back(std::move(t)); + v.set_tags(std::move(tags)); + vertices.emplace_back(std::move(v)); + } + int retry = 10; + while (retry-- > 0) { + auto f = sClient->addVertices(spaceId, vertices, true); + LOG(INFO) << "Waiting for the response..."; + auto resp = std::move(f).get(); + if (resp.completeness() == 100) { + LOG(INFO) << "All requests has been processed!"; + break; + } + if (!resp.succeeded()) { + for (auto& err : resp.failedParts()) { + LOG(ERROR) << "Partition " << err.first + << " failed: " << static_cast(err.second); + } + } + LOG(INFO) << "Failed, the remaining retry times " << retry; + } + } + { + LOG(INFO) << "Check data..."; + std::vector vIds; + std::vector retCols; + for (int32_t vId = 0; vId < 10000; vId++) { + vIds.emplace_back(vId); + } + retCols.emplace_back(storage::TestUtils::vetexPropDef("c", tagId)); + auto f = sClient->getVertexProps(spaceId, std::move(vIds), std::move(retCols)); + auto resp = std::move(f).get(); + if (!resp.succeeded()) { + std::stringstream ss; + for (auto& p : resp.failedParts()) { + ss << "Part " << p.first + << ": " << static_cast(p.second) + << "; "; + } + VLOG(2) << "Failed partitions:: " << ss.str(); + } + ASSERT_TRUE(resp.succeeded()); + auto& results = resp.responses(); + ASSERT_EQ(partition, results.size()); + EXPECT_EQ(0, results[0].result.failed_codes.size()); + EXPECT_EQ(1, results[0].vertex_schema[tagId].columns.size()); + auto tagProvider = std::make_shared(results[0].vertex_schema[tagId]); + EXPECT_EQ(10000, results[0].vertices.size()); + } + LOG(INFO) << "Let's open a new storage service"; + std::unique_ptr newServer; + std::unique_ptr newMetaClient; + uint32_t storagePort = network::NetworkUtils::getAvailablePort(); + HostAddr storageAddr(localIp, storagePort); + { + newMetaClient = std::make_unique(threadPool, metaAddr, storageAddr, + kClusterId, true); + newMetaClient->waitForMetadReady(); + std::string dataPath = folly::stringPrintf("%s/%d/data", rootPath.path(), replica + 1); + newServer = storage::TestUtils::mockStorageServer(newMetaClient.get(), + dataPath.c_str(), + localIp, + storagePort, + true); + LOG(INFO) << "Start a new storage server on " << storageAddr; + } + LOG(INFO) << "Let's stop the last storage servcie " << storagePorts.back(); + { + metaClients.back()->stop(); + serverContexts.back().reset(); + // Wait for the host be expired on meta. + sleep(FLAGS_expired_threshold_sec + 1); + } + + LOG(INFO) << "Let's balance"; + auto bIdRet = balancer.balance(); + CHECK(bIdRet.ok()) << bIdRet.status(); + while (balancer.isRunning()) { + sleep(1); + } + + // Sleep enough time until the data committed on newly added server + sleep(FLAGS_raft_heartbeat_interval_secs + 5); + { + LOG(INFO) << "Balance Finished, check the newly added server"; + std::unique_ptr iter; + auto prefix = NebulaKeyUtils::prefix(1); + ASSERT_EQ(kvstore::ResultCode::SUCCEEDED, newServer->kvStore_->prefix(spaceId, + 1, + prefix, + &iter)); + int num = 0; + std::string lastKey = ""; + while (iter->valid()) { + // filter the multipule versions for data. + auto key = NebulaKeyUtils::keyWithNoVersion(iter->key()); + if (lastKey == key) { + iter->next(); + continue; + } + lastKey = key.str(); + iter->next(); + num++; + } + ASSERT_EQ(10000, num); + } + { + LOG(INFO) << "Check meta"; + auto paRet = mClient->getPartsAlloc(spaceId).get(); + ASSERT_TRUE(paRet.ok()) << paRet.status(); + ASSERT_EQ(1, paRet.value().size()); + for (auto it = paRet.value().begin(); it != paRet.value().end(); it++) { + ASSERT_EQ(3, it->second.size()); + ASSERT_TRUE(std::find(it->second.begin(), it->second.end(), storageAddr) + != it->second.end()); + } + } + for (auto& c : metaClients) { + c->stop(); + } + serverContexts.clear(); + metaClients.clear(); + newMetaClient->stop(); + newServer.reset(); + newMetaClient.reset(); } TEST(BalanceIntegrationTest, LeaderBalanceTest) { @@ -100,6 +318,11 @@ TEST(BalanceIntegrationTest, LeaderBalanceTest) { std::unordered_map> leaderIds; EXPECT_EQ(3, serverContexts[i]->kvStore_->allLeader(leaderIds)); } + for (auto& c : metaClients) { + c->stop(); + } + serverContexts.clear(); + metaClients.clear(); } } // namespace meta diff --git a/src/meta/test/BalancerTest.cpp b/src/meta/test/BalancerTest.cpp index 363261c62dc..e1e45b264aa 100644 --- a/src/meta/test/BalancerTest.cpp +++ b/src/meta/test/BalancerTest.cpp @@ -32,7 +32,9 @@ class TestFaultInjector : public FaultInjector { folly::Future response(int index) { folly::Promise pro; auto f = pro.getFuture(); + LOG(INFO) << "Response " << index; executor_->add([this, p = std::move(pro), index]() mutable { + LOG(INFO) << "Call callback"; p.setValue(this->statusArray_[index]); }); return f; @@ -87,7 +89,7 @@ TEST(BalanceTaskTest, SimpleTest) { std::vector sts(7, Status::OK()); std::unique_ptr injector(new TestFaultInjector(std::move(sts))); auto client = std::make_unique(std::move(injector)); - BalanceTask task(0, 0, 0, HostAddr(0, 0), HostAddr(1, 1), nullptr, nullptr); + BalanceTask task(0, 0, 0, HostAddr(0, 0), HostAddr(1, 1), true, nullptr, nullptr); folly::Baton b; task.onFinished_ = [&]() { LOG(INFO) << "Task finished!"; @@ -112,7 +114,7 @@ TEST(BalanceTaskTest, SimpleTest) { Status::OK()}; std::unique_ptr injector(new TestFaultInjector(std::move(sts))); auto client = std::make_unique(std::move(injector)); - BalanceTask task(0, 0, 0, HostAddr(0, 0), HostAddr(1, 1), nullptr, nullptr); + BalanceTask task(0, 0, 0, HostAddr(0, 0), HostAddr(1, 1), true, nullptr, nullptr); folly::Baton b; task.onFinished_ = []() { LOG(FATAL) << "We should not reach here!"; @@ -254,7 +256,7 @@ TEST(BalanceTest, DispatchTasksTest) { FLAGS_task_concurrency = 10; BalancePlan plan(0L, nullptr, nullptr); for (int i = 0; i < 20; i++) { - BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), true, nullptr, nullptr); plan.addTask(std::move(task)); } plan.dispatchTasks(); @@ -267,7 +269,7 @@ TEST(BalanceTest, DispatchTasksTest) { FLAGS_task_concurrency = 10; BalancePlan plan(0L, nullptr, nullptr); for (int i = 0; i < 5; i++) { - BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), true, nullptr, nullptr); plan.addTask(std::move(task)); } plan.dispatchTasks(); @@ -280,11 +282,11 @@ TEST(BalanceTest, DispatchTasksTest) { FLAGS_task_concurrency = 20; BalancePlan plan(0L, nullptr, nullptr); for (int i = 0; i < 5; i++) { - BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), true, nullptr, nullptr); plan.addTask(std::move(task)); } for (int i = 0; i < 10; i++) { - BalanceTask task(0, 0, i, HostAddr(i, 2), HostAddr(i, 3), nullptr, nullptr); + BalanceTask task(0, 0, i, HostAddr(i, 2), HostAddr(i, 3), true, nullptr, nullptr); plan.addTask(std::move(task)); } plan.dispatchTasks(); @@ -308,7 +310,7 @@ TEST(BalanceTest, BalancePlanTest) { auto client = std::make_unique(std::move(injector)); for (int i = 0; i < 10; i++) { - BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + BalanceTask task(0, 0, 0, HostAddr(i, 0), HostAddr(i, 1), true, nullptr, nullptr); task.client_ = client.get(); plan.addTask(std::move(task)); } @@ -333,7 +335,7 @@ TEST(BalanceTest, BalancePlanTest) { auto client = std::make_unique(std::move(injector)); for (int i = 0; i < 10; i++) { - BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), true, nullptr, nullptr); task.client_ = client.get(); plan.addTask(std::move(task)); } @@ -361,7 +363,7 @@ TEST(BalanceTest, BalancePlanTest) { std::unique_ptr injector(new TestFaultInjector(std::move(sts))); client1 = std::make_unique(std::move(injector)); for (int i = 0; i < 9; i++) { - BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), nullptr, nullptr); + BalanceTask task(0, 0, i, HostAddr(i, 0), HostAddr(i, 1), true, nullptr, nullptr); task.client_ = client1.get(); plan.addTask(std::move(task)); } @@ -377,7 +379,7 @@ TEST(BalanceTest, BalancePlanTest) { Status::OK()}; std::unique_ptr injector(new TestFaultInjector(std::move(sts))); client2 = std::make_unique(std::move(injector)); - BalanceTask task(0, 0, 0, HostAddr(10, 0), HostAddr(10, 1), nullptr, nullptr); + BalanceTask task(0, 0, 0, HostAddr(10, 0), HostAddr(10, 1), true, nullptr, nullptr); task.client_ = client2.get(); plan.addTask(std::move(task)); } @@ -416,7 +418,7 @@ TEST(BalanceTest, NormalTest) { auto client = std::make_unique(std::move(injector)); Balancer balancer(kv.get(), std::move(client)); auto ret = balancer.balance(); - CHECK_EQ(Status::Error("No tasks"), ret.status()); + CHECK_EQ(Status::Balanced(), ret.status()); sleep(1); LOG(INFO) << "Now, we lost host " << HostAddr(3, 3); @@ -465,9 +467,11 @@ TEST(BalanceTest, NormalTest) { ASSERT_EQ(BalanceTask::Status::END, task.status_); task.ret_ = std::get<1>(tup); ASSERT_EQ(BalanceTask::Result::SUCCEEDED, task.ret_); - task.startTimeMs_ = std::get<2>(tup); + task.srcLived_ = std::get<2>(tup); + ASSERT_FALSE(task.srcLived_); + task.startTimeMs_ = std::get<3>(tup); ASSERT_GT(task.startTimeMs_, 0); - task.endTimeMs_ = std::get<3>(tup); + task.endTimeMs_ = std::get<4>(tup); ASSERT_GT(task.endTimeMs_, 0); } num++; @@ -555,9 +559,11 @@ TEST(BalanceTest, RecoveryTest) { ASSERT_EQ(BalanceTask::Status::CATCH_UP_DATA, task.status_); task.ret_ = std::get<1>(tup); ASSERT_EQ(BalanceTask::Result::FAILED, task.ret_); - task.startTimeMs_ = std::get<2>(tup); + task.srcLived_ = std::get<2>(tup); + ASSERT_FALSE(task.srcLived_); + task.startTimeMs_ = std::get<3>(tup); ASSERT_GT(task.startTimeMs_, 0); - task.endTimeMs_ = std::get<3>(tup); + task.endTimeMs_ = std::get<4>(tup); ASSERT_GT(task.endTimeMs_, 0); } num++; @@ -608,12 +614,13 @@ TEST(BalanceTest, RecoveryTest) { { auto tup = BalanceTask::parseVal(iter->val()); task.status_ = std::get<0>(tup); - ASSERT_EQ(BalanceTask::Status::END, task.status_); task.ret_ = std::get<1>(tup); - ASSERT_EQ(BalanceTask::Result::SUCCEEDED, task.ret_); - task.startTimeMs_ = std::get<2>(tup); + ASSERT_EQ(BalanceTask::Result::INVALID, task.ret_); + task.srcLived_ = std::get<2>(tup); + ASSERT_FALSE(task.srcLived_); + task.startTimeMs_ = std::get<3>(tup); ASSERT_GT(task.startTimeMs_, 0); - task.endTimeMs_ = std::get<3>(tup); + task.endTimeMs_ = std::get<4>(tup); ASSERT_GT(task.endTimeMs_, 0); } num++; diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index fdf5e1bc6d3..e66c0c2666d 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -334,6 +334,7 @@ nebula_add_test( $ $ $ + $ $ $ $ diff --git a/src/meta/test/MetaClientTest.cpp b/src/meta/test/MetaClientTest.cpp index 275e22b86b0..07e5c484858 100644 --- a/src/meta/test/MetaClientTest.cpp +++ b/src/meta/test/MetaClientTest.cpp @@ -324,7 +324,8 @@ TEST(MetaClientTest, TagTest) { columns.emplace_back(FRAGILE, "column_s", ValueType(FRAGILE, SupportedType::STRING, nullptr, nullptr)); nebula::cpp2::Schema schema; - auto result = client->createTagSchema(spaceId, "test_tag", schema).get(); + schema.set_columns(std::move(columns)); + auto result = client->createTagSchema(spaceId, "test_tag", std::move(schema)).get(); ASSERT_TRUE(result.ok()); id = result.value(); } @@ -342,6 +343,7 @@ TEST(MetaClientTest, TagTest) { ASSERT_TRUE(result1.ok()); auto result2 = client->getTagSchema(spaceId, "test_tag").get(); ASSERT_TRUE(result2.ok()); + ASSERT_EQ(3, result2.value().columns.size()); ASSERT_EQ(result1.value().columns.size(), result2.value().columns.size()); for (auto i = 0u; i < result1.value().columns.size(); i++) { ASSERT_EQ(result1.value().columns[i].name, result2.value().columns[i].name); diff --git a/src/meta/test/MetaHttpDownloadHandlerTest.cpp b/src/meta/test/MetaHttpDownloadHandlerTest.cpp index 8ff70aecc00..e60275e38ef 100644 --- a/src/meta/test/MetaHttpDownloadHandlerTest.cpp +++ b/src/meta/test/MetaHttpDownloadHandlerTest.cpp @@ -16,6 +16,7 @@ DECLARE_int32(load_data_interval_secs); DECLARE_string(pid_file); +DECLARE_int32(ws_storage_http_port); namespace nebula { namespace meta { @@ -25,7 +26,8 @@ std::unique_ptr helper = std::make_unique(kv_->partManager()); + partMan->addPart(1, 1); + partMan->addPart(1, 2); + + // wait for the leader election + sleep(3); + pool_ = std::make_unique(); pool_->start(3); diff --git a/src/meta/test/MetaHttpIngestHandlerTest.cpp b/src/meta/test/MetaHttpIngestHandlerTest.cpp index 7653607abad..260c5078c70 100644 --- a/src/meta/test/MetaHttpIngestHandlerTest.cpp +++ b/src/meta/test/MetaHttpIngestHandlerTest.cpp @@ -15,13 +15,16 @@ #include #include "thread/GenericThreadPool.h" +DECLARE_int32(ws_storage_http_port); + namespace nebula { namespace meta { class MetaHttpIngestHandlerTestEnv : public ::testing::Environment { public: void SetUp() override { - FLAGS_ws_http_port = 12000; + FLAGS_ws_http_port = 12100; + FLAGS_ws_storage_http_port = 12100; FLAGS_ws_h2_port = 0; VLOG(1) << "Starting web service..."; diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index 04e185c8100..a8096f82040 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -44,9 +44,6 @@ class TestUtils { // 0 => {0} auto& partsMap = partMan->partsMap(); partsMap[0][0] = PartMeta(); - // 1 => {1,2} - partsMap[1][1] = PartMeta(); - partsMap[1][2] = PartMeta(); std::vector paths; paths.emplace_back(folly::stringPrintf("%s/disk1", rootPath)); diff --git a/src/parser/AdminSentences.h b/src/parser/AdminSentences.h index 689b608df3e..131fba66c46 100644 --- a/src/parser/AdminSentences.h +++ b/src/parser/AdminSentences.h @@ -359,6 +359,8 @@ class BalanceSentence final : public Sentence { enum class SubType : uint32_t { kUnknown, kLeader, + kData, + kShowBalancePlan, }; // TODO: add more subtype for balance @@ -367,14 +369,25 @@ class BalanceSentence final : public Sentence { subType_ = std::move(subType); } + explicit BalanceSentence(int64_t id) { + kind_ = Kind::kBalance; + subType_ = SubType::kShowBalancePlan; + balanceId_ = id; + } + std::string toString() const override; SubType subType() const { return subType_; } + int64_t balanceId() const { + return balanceId_; + } + private: SubType subType_{SubType::kUnknown}; + int64_t balanceId_{0}; }; } // namespace nebula diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 97c863d4381..47e90f650bf 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -107,7 +107,8 @@ class GraphScanner; %token KW_ORDER KW_ASC %token KW_FETCH KW_PROP KW_UPDATE KW_UPSERT KW_WHEN %token KW_DISTINCT KW_ALL KW_OF -%token KW_BALANCE KW_LEADER +%token KW_BALANCE KW_LEADER KW_DATA + /* symbols */ %token L_PAREN R_PAREN L_BRACKET R_BRACKET L_BRACE R_BRACE COMMA %token PIPE OR AND XOR LT LE GT GE EQ NE PLUS MINUS MUL DIV MOD NOT NEG ASSIGN @@ -1565,6 +1566,12 @@ balance_sentence : KW_BALANCE KW_LEADER { $$ = new BalanceSentence(BalanceSentence::SubType::kLeader); } + | KW_BALANCE KW_DATA { + $$ = new BalanceSentence(BalanceSentence::SubType::kData); + } + | KW_BALANCE KW_DATA INTEGER { + $$ = new BalanceSentence($3); + } ; mutate_sentence diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index f5940149c27..d80d9c4e439 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -120,6 +120,7 @@ ALL ([Aa][Ll][Ll]) BALANCE ([Bb][Aa][Ll][Aa][Nn][Cc][Ee]) LEADER ([Ll][Ee][Aa][Dd][Ee][Rr]) OF ([Oo][Ff]) +DATA ([Dd][Aa][Tt][Aa]) LABEL ([a-zA-Z][_a-zA-Z0-9]*) DEC ([0-9]) @@ -229,6 +230,7 @@ IP_OCTET ([0-9]|[1-9][0-9]|1[0-9][0-9]|2[0-4][0-9]|25[0-5]) {ALL} { return TokenType::KW_ALL; } {BALANCE} { return TokenType::KW_BALANCE; } {LEADER} { return TokenType::KW_LEADER; } +{DATA} { return TokenType::KW_DATA; } "." { return TokenType::DOT; } "," { return TokenType::COMMA; } diff --git a/src/storage/AdminProcessor.h b/src/storage/AdminProcessor.h index 24b786f6b0e..a253db40823 100644 --- a/src/storage/AdminProcessor.h +++ b/src/storage/AdminProcessor.h @@ -8,9 +8,11 @@ #define STORAGE_TRANSLEADERPROCESSOR_H_ #include "base/Base.h" -#include "storage/BaseProcessor.h" #include "kvstore/NebulaStore.h" #include "kvstore/Part.h" +#include "storage/StorageFlags.h" +#include "storage/BaseProcessor.h" +#include namespace nebula { namespace storage { @@ -23,13 +25,15 @@ class TransLeaderProcessor : public BaseProcessor { void process(const cpp2::TransLeaderReq& req) { CHECK_NOTNULL(kvstore_); + LOG(INFO) << "Receive transfer leader for space " + << req.get_space_id() << ", part " << req.get_part_id() + << ", to [" << req.get_new_leader().get_ip() + << ", " << req.get_new_leader().get_port() << "]"; auto spaceId = req.get_space_id(); auto partId = req.get_part_id(); auto ret = kvstore_->part(spaceId, partId); if (!ok(ret)) { - resp_.set_code(to(error(ret))); - promise_.setValue(std::move(resp_)); - delete this; + onFinished(to(error(ret))); return; } auto part = nebula::value(ret); @@ -41,16 +45,9 @@ class TransLeaderProcessor : public BaseProcessor { CHECK(ok(leaderRet)); if (code == kvstore::ResultCode::ERR_LEADER_CHANGED) { auto leader = value(std::move(leaderRet)); - // Check target is randomPeer (0, 0) or the election has completed. - if (host == HostAddr(0, 0) || leader == host) { - code = kvstore::ResultCode::SUCCEEDED; - } else { - resp_.set_leader(toThriftHost(leader)); - } + resp_.set_leader(toThriftHost(leader)); } - resp_.set_code(to(code)); - promise_.setValue(std::move(resp_)); - delete this; + onFinished(to(code)); }); } @@ -61,17 +58,40 @@ class TransLeaderProcessor : public BaseProcessor { class AddPartProcessor : public BaseProcessor { public: - static AddPartProcessor* instance(kvstore::KVStore* kvstore) { - return new AddPartProcessor(kvstore); + static AddPartProcessor* instance(kvstore::KVStore* kvstore, meta::MetaClient* mClient) { + return new AddPartProcessor(kvstore, mClient); } void process(const cpp2::AddPartReq& req) { - UNUSED(req); + if (FLAGS_store_type != "nebula") { + onFinished(cpp2::ErrorCode::E_INVALID_STORE); + return; + } + + LOG(INFO) << "Receive add part for space " + << req.get_space_id() << ", part " << req.get_part_id(); + auto* store = static_cast(kvstore_); + auto ret = store->space(req.get_space_id()); + if (!nebula::ok(ret) && nebula::error(ret) == kvstore::ResultCode::ERR_SPACE_NOT_FOUND) { + LOG(INFO) << "Space " << req.get_space_id() << " not exist, create it!"; + store->addSpace(req.get_space_id()); + } + auto st = mClient_->refreshCache(); + if (!st.ok()) { + onFinished(cpp2::ErrorCode::E_LOAD_META_FAILED); + return; + } + store->addPart(req.get_space_id(), req.get_part_id(), req.get_as_learner()); + onFinished(cpp2::ErrorCode::SUCCEEDED); } private: - explicit AddPartProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore, nullptr) {} + explicit AddPartProcessor(kvstore::KVStore* kvstore, meta::MetaClient* mClient) + : BaseProcessor(kvstore, nullptr) + , mClient_(mClient) {} + +private: + meta::MetaClient* mClient_ = nullptr; }; class RemovePartProcessor : public BaseProcessor { @@ -81,7 +101,13 @@ class RemovePartProcessor : public BaseProcessor { } void process(const cpp2::RemovePartReq& req) { - UNUSED(req); + if (FLAGS_store_type != "nebula") { + onFinished(cpp2::ErrorCode::E_INVALID_STORE); + return; + } + auto* store = static_cast(kvstore_); + store->removePart(req.get_space_id(), req.get_part_id()); + onFinished(cpp2::ErrorCode::SUCCEEDED); } private: @@ -96,7 +122,36 @@ class MemberChangeProcessor : public BaseProcessor { } void process(const cpp2::MemberChangeReq& req) { - UNUSED(req); + CHECK_NOTNULL(kvstore_); + auto spaceId = req.get_space_id(); + auto partId = req.get_part_id(); + LOG(INFO) << "Receive member change for space " + << spaceId << ", part " << partId + << ", add/remove " << (req.get_add() ? "add" : "remove"); + auto ret = kvstore_->part(spaceId, partId); + if (!ok(ret)) { + onFinished(to(error(ret))); + return; + } + auto part = nebula::value(ret); + auto peer = kvstore::NebulaStore::getRaftAddr(HostAddr(req.get_peer().get_ip(), + req.get_peer().get_port())); + auto cb = [this, spaceId, partId] (kvstore::ResultCode code) { + auto leaderRet = kvstore_->partLeader(spaceId, partId); + CHECK(ok(leaderRet)); + if (code == kvstore::ResultCode::ERR_LEADER_CHANGED) { + auto leader = value(std::move(leaderRet)); + resp_.set_leader(toThriftHost(leader)); + } + onFinished(to(code)); + }; + if (req.get_add()) { + LOG(INFO) << "Add peer " << peer; + part->asyncAddPeer(peer, cb); + } else { + LOG(INFO) << "Remove peer " << peer; + part->asyncRemovePeer(peer, cb); + } } private: @@ -111,7 +166,27 @@ class AddLearnerProcessor : public BaseProcessor { } void process(const cpp2::AddLearnerReq& req) { - UNUSED(req); + auto spaceId = req.get_space_id(); + auto partId = req.get_part_id(); + LOG(INFO) << "Receive add learner for space " + << spaceId << ", part " << partId; + auto ret = kvstore_->part(spaceId, partId); + if (!ok(ret)) { + onFinished(to(error(ret))); + return; + } + auto part = nebula::value(ret); + auto learner = kvstore::NebulaStore::getRaftAddr(HostAddr(req.get_learner().get_ip(), + req.get_learner().get_port())); + part->asyncAddLearner(learner, [this, spaceId, partId] (kvstore::ResultCode code) { + auto leaderRet = kvstore_->partLeader(spaceId, partId); + CHECK(ok(leaderRet)); + if (code == kvstore::ResultCode::ERR_LEADER_CHANGED) { + auto leader = value(std::move(leaderRet)); + resp_.set_leader(toThriftHost(leader)); + } + onFinished(to(code)); + }); } private: @@ -125,8 +200,55 @@ class WaitingForCatchUpDataProcessor : public BaseProcessor return new WaitingForCatchUpDataProcessor(kvstore); } + ~WaitingForCatchUpDataProcessor() { + } + void process(const cpp2::CatchUpDataReq& req) { - UNUSED(req); + auto spaceId = req.get_space_id(); + auto partId = req.get_part_id(); + LOG(INFO) << "Received waiting for catching up data for space " + << spaceId << ", part " << partId; + auto ret = kvstore_->part(spaceId, partId); + if (!ok(ret)) { + onFinished(to(error(ret))); + return; + } + auto part = nebula::value(ret); + auto peer = kvstore::NebulaStore::getRaftAddr(HostAddr(req.get_target().get_ip(), + req.get_target().get_port())); + + folly::async([this, part, peer, spaceId, partId] { + int retry = FLAGS_waiting_catch_up_retry_times; + while (retry-- > 0) { + LOG(INFO) << "Waiting for catching up data, peer " << peer + << ", try " << retry << " times"; + auto res = part->isCatchedUp(peer); + switch (res) { + case raftex::AppendLogResult::SUCCEEDED: + onFinished(cpp2::ErrorCode::SUCCEEDED); + return; + case raftex::AppendLogResult::E_INVALID_PEER: + onFinished(cpp2::ErrorCode::E_INVALID_PEER); + return; + case raftex::AppendLogResult::E_NOT_A_LEADER: { + auto leaderRet = kvstore_->partLeader(spaceId, partId); + CHECK(ok(leaderRet)); + auto leader = value(std::move(leaderRet)); + resp_.set_leader(toThriftHost(leader)); + onFinished(cpp2::ErrorCode::E_LEADER_CHANGED); + return; + } + case raftex::AppendLogResult::E_SENDING_SNAPSHOT: + LOG(INFO) << "Still sending snapshot, please wait..."; + break; + default: + LOG(INFO) << "Unknown error " << static_cast(res); + break; + } + sleep(FLAGS_waiting_catch_up_interval_in_secs); + } + onFinished(cpp2::ErrorCode::E_RETRY_EXHAUSTED); + }); } private: diff --git a/src/storage/BaseProcessor.h b/src/storage/BaseProcessor.h index 57c634819f6..efa3c7e4f15 100644 --- a/src/storage/BaseProcessor.h +++ b/src/storage/BaseProcessor.h @@ -48,6 +48,14 @@ class BaseProcessor { delete this; } + // This method will be used for single part request processor. + // Currently, it is used in AdminProcessor + void onFinished(cpp2::ErrorCode code) { + resp_.set_code(code); + promise_.setValue(std::move(resp_)); + delete this; + } + void doPut(GraphSpaceID spaceId, PartitionID partId, std::vector data); void doRemove(GraphSpaceID spaceId, PartitionID partId, std::vector keys); diff --git a/src/storage/BaseProcessor.inl b/src/storage/BaseProcessor.inl index cbb7b11bd70..76515ee5fd5 100644 --- a/src/storage/BaseProcessor.inl +++ b/src/storage/BaseProcessor.inl @@ -21,6 +21,8 @@ cpp2::ErrorCode BaseProcessor::to(kvstore::ResultCode code) { return cpp2::ErrorCode::E_SPACE_NOT_FOUND; case kvstore::ResultCode::ERR_PART_NOT_FOUND: return cpp2::ErrorCode::E_PART_NOT_FOUND; + case kvstore::ResultCode::ERR_CONSENSUS_ERROR: + return cpp2::ErrorCode::E_CONSENSUS_ERROR; default: return cpp2::ErrorCode::E_UNKNOWN; } diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index e3a1279dcce..028a8a4c854 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -1,6 +1,7 @@ nebula_add_library( storage_service_handler OBJECT StorageServiceHandler.cpp + StorageFlags.cpp QueryBaseProcessor.cpp AddVerticesProcessor.cpp AddEdgesProcessor.cpp diff --git a/src/storage/StorageFlags.cpp b/src/storage/StorageFlags.cpp new file mode 100644 index 00000000000..f1993486982 --- /dev/null +++ b/src/storage/StorageFlags.cpp @@ -0,0 +1,15 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#include "base/Base.h" +#include "storage/StorageFlags.h" + +DEFINE_string(store_type, "nebula", + "Which type of KVStore to be used by the storage daemon." + " Options can be \"nebula\", \"hbase\", etc."); +DEFINE_int32(waiting_catch_up_retry_times, 30, "retry times when waiting for catching up data"); +DEFINE_int32(waiting_catch_up_interval_in_secs, 30, + "interval between two requests for catching up state"); diff --git a/src/storage/StorageFlags.h b/src/storage/StorageFlags.h new file mode 100644 index 00000000000..80e3fe6cc59 --- /dev/null +++ b/src/storage/StorageFlags.h @@ -0,0 +1,18 @@ +/* Copyright (c) 2019 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +#ifndef STORAGE_STORAGEFLAGS_H_ +#define STORAGE_STORAGEFLAGS_H_ + +#include "base/Base.h" + +DECLARE_string(store_type); + +DECLARE_int32(waiting_catch_up_retry_times); + +DECLARE_int32(waiting_catch_up_interval_in_secs); + +#endif // STORAGE_STORAGEFLAGS_H_ diff --git a/src/storage/StorageServer.cpp b/src/storage/StorageServer.cpp index e2b2970baf0..14703b039a5 100644 --- a/src/storage/StorageServer.cpp +++ b/src/storage/StorageServer.cpp @@ -6,6 +6,7 @@ #include "storage/StorageServer.h" #include "network/NetworkUtils.h" +#include "storage/StorageFlags.h" #include "storage/StorageServiceHandler.h" #include "storage/StorageHttpStatusHandler.h" #include "storage/StorageHttpDownloadHandler.h" @@ -21,9 +22,6 @@ DEFINE_int32(port, 44500, "Storage daemon listening port"); DEFINE_bool(reuse_port, true, "Whether to turn on the SO_REUSEPORT option"); -DEFINE_string(store_type, "nebula", - "Which type of KVStore to be used by the storage daemon." - " Options can be \"nebula\", \"hbase\", etc."); DEFINE_int32(num_io_threads, 16, "Number of IO threads"); DEFINE_int32(num_worker_threads, 32, "Number of workers"); DEFINE_int32(storage_http_thread_num, 3, "Number of storage daemon's http thread"); @@ -124,7 +122,9 @@ bool StorageServer::start() { return false; } - auto handler = std::make_shared(kvstore_.get(), schemaMan_.get()); + auto handler = std::make_shared(kvstore_.get(), + schemaMan_.get(), + metaClient_.get()); try { LOG(INFO) << "The storage deamon start on " << localHost_; tfServer_ = std::make_unique(); diff --git a/src/storage/StorageServiceHandler.cpp b/src/storage/StorageServiceHandler.cpp index b9209ac33a1..cbf54340616 100644 --- a/src/storage/StorageServiceHandler.cpp +++ b/src/storage/StorageServiceHandler.cpp @@ -103,7 +103,7 @@ StorageServiceHandler::future_transLeader(const cpp2::TransLeaderReq& req) { folly::Future StorageServiceHandler::future_addPart(const cpp2::AddPartReq& req) { - auto* processor = AddPartProcessor::instance(kvstore_); + auto* processor = AddPartProcessor::instance(kvstore_, metaClient_); RETURN_FUTURE(processor); } diff --git a/src/storage/StorageServiceHandler.h b/src/storage/StorageServiceHandler.h index 667bbe5b42f..5e0f35a925b 100644 --- a/src/storage/StorageServiceHandler.h +++ b/src/storage/StorageServiceHandler.h @@ -21,9 +21,11 @@ class StorageServiceHandler final : public cpp2::StorageServiceSvIf { public: StorageServiceHandler(kvstore::KVStore* kvstore, - meta::SchemaManager* schemaMan) + meta::SchemaManager* schemaMan, + meta::MetaClient* client) : kvstore_(kvstore) - , schemaMan_(schemaMan) {} + , schemaMan_(schemaMan) + , metaClient_(client) {} folly::Future future_getBound(const cpp2::GetNeighborsRequest& req) override; @@ -82,7 +84,8 @@ class StorageServiceHandler final : public cpp2::StorageServiceSvIf { private: kvstore::KVStore* kvstore_ = nullptr; - meta::SchemaManager* schemaMan_; + meta::SchemaManager* schemaMan_ = nullptr; + meta::MetaClient* metaClient_ = nullptr; }; } // namespace storage diff --git a/src/storage/test/StorageServiceHandlerTest.cpp b/src/storage/test/StorageServiceHandlerTest.cpp index e3631f8b4f6..74fa379a883 100644 --- a/src/storage/test/StorageServiceHandlerTest.cpp +++ b/src/storage/test/StorageServiceHandlerTest.cpp @@ -27,7 +27,9 @@ TEST(StorageServiceHandlerTest, FutureAddVerticesTest) { LOG(INFO) << "Test FutureAddVerticesTest..."; std::unique_ptr kvstore = TestUtils::initKV(rootPath.path()); - auto storageServiceHandler = std::make_unique(kvstore.get(), nullptr); + auto storageServiceHandler = std::make_unique(kvstore.get(), + nullptr, + nullptr); auto resp = storageServiceHandler->future_addVertices(req).get(); EXPECT_EQ(typeid(cpp2::ExecResponse).name() , typeid(resp).name()); diff --git a/src/storage/test/TestUtils.h b/src/storage/test/TestUtils.h index 012f760d4b2..e435a3d7dab 100644 --- a/src/storage/test/TestUtils.h +++ b/src/storage/test/TestUtils.h @@ -210,7 +210,7 @@ class TestUtils { } auto handler = std::make_shared( - sc->kvStore_.get(), sc->schemaMan_.get()); + sc->kvStore_.get(), sc->schemaMan_.get(), mClient); sc->mockCommon("storage", port, handler); auto ptr = dynamic_cast( sc->kvStore_->partManager()); diff --git a/src/tools/storage-perf/StoragePerfTool.cpp b/src/tools/storage-perf/StoragePerfTool.cpp index 6c754ab4bbf..cf209188d40 100644 --- a/src/tools/storage-perf/StoragePerfTool.cpp +++ b/src/tools/storage-perf/StoragePerfTool.cpp @@ -235,13 +235,16 @@ class Perf { auto f = client_->addVertices(spaceId_, genVertices(), true) .via(evb).then([this](auto&& resps) { if (!resps.succeeded()) { - LOG(ERROR) << "Request failed!"; + for (auto& entry : resps.failedParts()) { + LOG(ERROR) << "Request failed, part " << entry.first + << ", error " << static_cast(entry.second); + } } else { VLOG(3) << "request successed!"; } this->finishedRequests_++; - }).onError([](folly::FutureException&) { - LOG(ERROR) << "Request failed!"; + }).onError([](folly::FutureException& e) { + LOG(ERROR) << "Request failed, e = " << e.what(); }); }