diff --git a/src/meta/processors/admin/AdminClient.cpp b/src/meta/processors/admin/AdminClient.cpp index 3decaf0b314..f238a728d50 100644 --- a/src/meta/processors/admin/AdminClient.cpp +++ b/src/meta/processors/admin/AdminClient.cpp @@ -28,7 +28,7 @@ folly::Future AdminClient::transLeader(GraphSpaceID spaceId, req.part_id_ref() = partId; auto ret = getPeers(spaceId, partId); if (!nebula::ok(ret)) { - LOG(ERROR) << "Get peers failed: " << static_cast(nebula::error(ret)); + LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); return Status::Error("Get peers failed"); } @@ -83,7 +83,7 @@ folly::Future AdminClient::addPart(GraphSpaceID spaceId, req.as_learner_ref() = asLearner; auto ret = getPeers(spaceId, partId); if (!nebula::ok(ret)) { - LOG(ERROR) << "Get peers failed: " << static_cast(nebula::error(ret)); + LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); return Status::Error("Get peers failed"); } @@ -110,7 +110,7 @@ folly::Future AdminClient::addLearner(GraphSpaceID spaceId, req.learner_ref() = learner; auto ret = getPeers(spaceId, partId); if (!nebula::ok(ret)) { - LOG(ERROR) << "Get peers failed: " << static_cast(nebula::error(ret)); + LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); return Status::Error("Get peers failed"); } @@ -137,7 +137,7 @@ folly::Future AdminClient::waitingForCatchUpData(GraphSpaceID spaceId, req.target_ref() = target; auto ret = getPeers(spaceId, partId); if (!nebula::ok(ret)) { - LOG(ERROR) << "Get peers failed: " << static_cast(nebula::error(ret)); + LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); return Status::Error("Get peers failed"); } @@ -166,7 +166,7 @@ folly::Future AdminClient::memberChange(GraphSpaceID spaceId, req.peer_ref() = peer; auto ret = getPeers(spaceId, partId); if (!nebula::ok(ret)) { - LOG(ERROR) << "Get peers failed: " << static_cast(nebula::error(ret)); + LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); return Status::Error("Get peers failed"); } @@ -191,7 +191,7 @@ folly::Future AdminClient::updateMeta(GraphSpaceID spaceId, CHECK_NOTNULL(kv_); auto ret = getPeers(spaceId, partId); if (!nebula::ok(ret)) { - LOG(ERROR) << "Get peers failed: " << static_cast(nebula::error(ret)); + LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(ret)); return Status::Error("Get peers failed"); } @@ -267,7 +267,7 @@ folly::Future AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID req.part_id_ref() = partId; auto peerRet = getPeers(spaceId, partId); if (!nebula::ok(peerRet)) { - LOG(ERROR) << "Get peers failed: " << static_cast(nebula::error(peerRet)); + LOG(INFO) << "Get peers failed: " << static_cast(nebula::error(peerRet)); return Status::Error("Get peers failed"); } @@ -554,8 +554,8 @@ void AdminClient::getLeaderDist(const HostAddr& host, .then([pro = std::move(pro), host, retry, retryLimit, this]( folly::Try&& t) mutable { if (t.hasException()) { - LOG(ERROR) << folly::stringPrintf("RPC failure in AdminClient: %s", - t.exception().what().c_str()); + LOG(INFO) << folly::stringPrintf("RPC failure in AdminClient: %s", + t.exception().what().c_str()); if (retry < retryLimit) { usleep(1000 * 50); getLeaderDist( @@ -637,8 +637,8 @@ folly::Future> AdminClient::createSnapshot( .then([p = std::move(pro), storageHost, host]( folly::Try&& t) mutable { if (t.hasException()) { - LOG(ERROR) << folly::stringPrintf("RPC failure in AdminClient: %s", - t.exception().what().c_str()); + LOG(INFO) << folly::stringPrintf("RPC failure in AdminClient: %s", + t.exception().what().c_str()); p.setValue(Status::Error("RPC failure in createCheckpoint")); return; } diff --git a/src/meta/processors/admin/AdminClient.h b/src/meta/processors/admin/AdminClient.h index 88527c5c54c..cb3ffbdc982 100644 --- a/src/meta/processors/admin/AdminClient.h +++ b/src/meta/processors/admin/AdminClient.h @@ -43,57 +43,168 @@ class AdminClient { return ioThreadPool_.get(); } + /** + * @brief Transfer given partition's leader from now to dst + * + * @param spaceId + * @param partId + * @param leader + * @param dst + * @return folly::Future + */ virtual folly::Future transLeader(GraphSpaceID spaceId, PartitionID partId, const HostAddr& leader, const HostAddr& dst = kRandomPeer); + /** + * @brief Add a peer for given partition in specified host. The rpc + * will be sent to the new partition peer, letting it know the + * other peers. + * + * @param spaceId + * @param partId + * @param host + * @param asLearner Add an peer only as raft learner + * @return folly::Future + */ virtual folly::Future addPart(GraphSpaceID spaceId, PartitionID partId, const HostAddr& host, bool asLearner); + /** + * @brief Add a learner for given partition. The rpc will be sent to + * the partition leader, writting the add event as a log. + * + * @param spaceId + * @param partId + * @param learner + * @return folly::Future + */ virtual folly::Future addLearner(GraphSpaceID spaceId, PartitionID partId, const HostAddr& learner); + /** + * @brief Waiting for give partition peer catching data + * + * @param spaceId + * @param partId + * @param target partition peer address + * @return folly::Future + */ virtual folly::Future waitingForCatchUpData(GraphSpaceID spaceId, PartitionID partId, const HostAddr& target); /** - * Add/Remove one peer for raft group (spaceId, partId). - * "added" should be true if we want to add one peer, otherwise it is false. - * */ + * @brief Add/Remove one peer for partition (spaceId, partId). + * "added" should be true if we want to add one peer, otherwise it is false. + * @param spaceId + * @param partId + * @param peer + * @param added + * @return folly::Future + */ virtual folly::Future memberChange(GraphSpaceID spaceId, PartitionID partId, const HostAddr& peer, bool added); + /** + * @brief Update partition peers info in meta kvstore, remove peer 'src', add peer 'dst' + * + * @param spaceId + * @param partId + * @param src + * @param dst + * @return folly::Future + */ virtual folly::Future updateMeta(GraphSpaceID spaceId, PartitionID partId, - const HostAddr& leader, + const HostAddr& src, const HostAddr& dst); + /** + * @brief Remove partition peer in given storage host + * + * @param spaceId + * @param partId + * @param host storage admin service address + * @return folly::Future + */ virtual folly::Future removePart(GraphSpaceID spaceId, PartitionID partId, const HostAddr& host); + /** + * @brief Check and adjust(add/remove) each peer's peers info according to meta kv store + * + * @param spaceId + * @param partId + * @return folly::Future + */ virtual folly::Future checkPeers(GraphSpaceID spaceId, PartitionID partId); + /** + * @brief Get the all partitions' leader distribution + * + * @param result + * @return folly::Future + */ virtual folly::Future getLeaderDist(HostLeaderMap* result); + // used for snapshot and backup + /** + * @brief Create snapshots for given spaces in given host with specified snapshot name + * + * @param spaceIds spaces to create snapshot + * @param name snapshot name + * @param host storage host + * @return folly::Future> + */ virtual folly::Future> createSnapshot( const std::set& spaceIds, const std::string& name, const HostAddr& host); + /** + * @brief Drop snapshots of given spaces in given host with specified snapshot name + * + * @param spaceIds spaces to drop + * @param name snapshot name + * @param host storage host + * @return folly::Future + */ virtual folly::Future dropSnapshot(const std::set& spaceIds, const std::string& name, const HostAddr& host); + /** + * @brief Blocking/Allowing writings to given spaces in specified storage host + * + * @param spaceIds + * @param sign BLOCK_ON: blocking, BLOCK_OFF: allowing + * @param host + * @return folly::Future + */ virtual folly::Future blockingWrites(const std::set& spaceIds, storage::cpp2::EngineSignType sign, const HostAddr& host); + /** + * @brief Add storage admin task to given storage host + * + * @param cmd + * @param jobId + * @param taskId + * @param spaceId + * @param specificHosts if hosts are empty, will send request to all ative storage hosts + * @param taskSpecficParas + * @param parts + * @param concurrency + * @param statsResult + * @return folly::Future + */ virtual folly::Future addTask(cpp2::AdminCmd cmd, int32_t jobId, int32_t taskId, @@ -104,6 +215,14 @@ class AdminClient { int concurrency, cpp2::StatsItem* statsResult = nullptr); + /** + * @brief Stop stoarge admin task in given storage host + * + * @param target if target hosts are emtpy, will send request to all active storage hosts + * @param jobId + * @param taskId + * @return folly::Future + */ virtual folly::Future stopTask(const std::vector& target, int32_t jobId, int32_t taskId); diff --git a/src/meta/processors/admin/AgentHBProcessor.cpp b/src/meta/processors/admin/AgentHBProcessor.cpp index 1df56fdab0e..8f6b42181d4 100644 --- a/src/meta/processors/admin/AgentHBProcessor.cpp +++ b/src/meta/processors/admin/AgentHBProcessor.cpp @@ -24,7 +24,6 @@ void AgentHBProcessor::onFinished() { Base::onFinished(); } -// Agent heartbeat register agent to meta and pull all services info in agent's host void AgentHBProcessor::process(const cpp2::AgentHBReq& req) { HostAddr agentAddr((*req.host_ref()).host, (*req.host_ref()).port); LOG(INFO) << "Receive heartbeat from " << agentAddr << ", role = AGENT"; @@ -36,9 +35,9 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) { time::WallClock::fastNowInMilliSec(), cpp2::HostRole::AGENT, req.get_git_info_sha()); ret = ActiveHostsMan::updateHostInfo(kvstore_, agentAddr, info); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << folly::sformat("Put agent {} info failed: {}", - agentAddr.toString(), - apache::thrift::util::enumNameSafe(ret)); + LOG(INFO) << folly::sformat("Put agent {} info failed: {}", + agentAddr.toString(), + apache::thrift::util::enumNameSafe(ret)); break; } @@ -46,9 +45,9 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) { auto servicesRet = ActiveHostsMan::getServicesInHost(kvstore_, agentAddr.host); if (!nebula::ok(servicesRet)) { ret = nebula::error(servicesRet); - LOG(ERROR) << folly::sformat("Get active services for {} failed: {}", - agentAddr.host, - apache::thrift::util::enumNameSafe(ret)); + LOG(INFO) << folly::sformat("Get active services for {} failed: {}", + agentAddr.host, + apache::thrift::util::enumNameSafe(ret)); break; } @@ -58,9 +57,9 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) { auto dirIterRet = doPrefix(hostDirHostPrefix); if (!nebula::ok(dirIterRet)) { ret = nebula::error(dirIterRet); - LOG(ERROR) << folly::sformat("Get host {} dir prefix iterator failed: {}", - agentAddr.host, - apache::thrift::util::enumNameSafe(ret)); + LOG(INFO) << folly::sformat("Get host {} dir prefix iterator failed: {}", + agentAddr.host, + apache::thrift::util::enumNameSafe(ret)); break; } auto dirIter = std::move(nebula::value(dirIterRet)); @@ -86,7 +85,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) { auto it = serviceDirinfo.find(addr); if (it == serviceDirinfo.end()) { - LOG(ERROR) << folly::sformat("{} dir info not found", addr.toString()); + LOG(INFO) << folly::sformat("{} dir info not found", addr.toString()); break; } @@ -99,7 +98,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) { if (serviceList.size() != services.size() - 1) { ret = nebula::cpp2::ErrorCode::E_AGENT_HB_FAILUE; // missing some services' dir info - LOG(ERROR) << folly::sformat( + LOG(INFO) << folly::sformat( "Missing some services's dir info, excepted service {}, but only got {}", services.size() - 1, serviceList.size()); @@ -111,8 +110,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) { auto partRet = kvstore_->part(kDefaultSpaceId, kDefaultPartId); if (!nebula::ok(partRet)) { ret = nebula::error(partRet); - LOG(ERROR) << "Get meta part store failed, error: " - << apache::thrift::util::enumNameSafe(ret); + LOG(INFO) << "Get meta part store failed, error: " << apache::thrift::util::enumNameSafe(ret); return; } auto raftPeers = nebula::value(partRet)->peers(); diff --git a/src/meta/processors/admin/AgentHBProcessor.h b/src/meta/processors/admin/AgentHBProcessor.h index 0119bb730fb..9591c631b7b 100644 --- a/src/meta/processors/admin/AgentHBProcessor.h +++ b/src/meta/processors/admin/AgentHBProcessor.h @@ -30,6 +30,9 @@ struct AgentHBCounters final { }; extern AgentHBCounters kAgentHBCounters; +/** + * @brief Agent heartbeat register agent to meta and pull all services info in agent's host + */ class AgentHBProcessor : public BaseProcessor { FRIEND_TEST(AgentHBProcessorTest, AgentHBTest); diff --git a/src/meta/processors/admin/CreateBackupProcessor.cpp b/src/meta/processors/admin/CreateBackupProcessor.cpp index 709a6ebfa08..85d74c7001d 100644 --- a/src/meta/processors/admin/CreateBackupProcessor.cpp +++ b/src/meta/processors/admin/CreateBackupProcessor.cpp @@ -30,7 +30,7 @@ CreateBackupProcessor::spaceNameToId(const std::vector* backupSpace auto result = doMultiGet(std::move(keys)); if (!nebula::ok(result)) { auto err = nebula::error(result); - LOG(ERROR) << "Failed to get space id, error: " << apache::thrift::util::enumNameSafe(err); + LOG(INFO) << "Failed to get space id, error: " << apache::thrift::util::enumNameSafe(err); if (err == nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) { return nebula::cpp2::ErrorCode::E_BACKUP_SPACE_NOT_FOUND; } @@ -50,7 +50,7 @@ CreateBackupProcessor::spaceNameToId(const std::vector* backupSpace auto iterRet = doPrefix(prefix); if (!nebula::ok(iterRet)) { auto retCode = nebula::error(iterRet); - LOG(ERROR) << "Space prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Space prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } @@ -65,8 +65,8 @@ CreateBackupProcessor::spaceNameToId(const std::vector* backupSpace } if (spaces.empty()) { - LOG(ERROR) << "Failed to create a full backup because there is currently " - "no space."; + LOG(INFO) << "Failed to create a full backup because there is currently " + "no space."; return nebula::cpp2::ErrorCode::E_BACKUP_SPACE_NOT_FOUND; } @@ -86,13 +86,13 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { // make sure there is no index job auto result = jobMgr->checkIndexJobRunning(); if (!nebula::ok(result)) { - LOG(ERROR) << "get Index status failed, not allowed to create backup."; + LOG(INFO) << "Get Index status failed, not allowed to create backup."; handleErrorCode(nebula::error(result)); onFinished(); return; } if (nebula::value(result)) { - LOG(ERROR) << "Index is rebuilding, not allowed to create backup."; + LOG(INFO) << "Index is rebuilding, not allowed to create backup."; handleErrorCode(nebula::cpp2::ErrorCode::E_BACKUP_BUILDING_INDEX); onFinished(); return; @@ -109,7 +109,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { } auto hosts = std::move(nebula::value(activeHostsRet)); if (hosts.empty()) { - LOG(ERROR) << "There has some offline hosts"; + LOG(INFO) << "There are some offline hosts"; handleErrorCode(nebula::cpp2::ErrorCode::E_NO_HOSTS); onFinished(); return; @@ -133,7 +133,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::INVALID, NetworkUtils::toHostsStr(hosts))); auto putRet = doSyncPut(data); if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Write backup meta error"; + LOG(INFO) << "Write backup meta error"; handleErrorCode(putRet); onFinished(); return; @@ -143,11 +143,11 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { // step 2 : Blocking all writes action for storage engines. auto ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_ON); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Send blocking sign to storage engine error"; + LOG(INFO) << "Send blocking sign to storage engine error"; handleErrorCode(ret); ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Cancel write blocking error"; + LOG(INFO) << "Cancel write blocking error"; } onFinished(); return; @@ -156,12 +156,12 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { // step 3 : Create checkpoint for all storage engines. auto sret = Snapshot::instance(kvstore_, client_)->createSnapshot(backupName); if (!nebula::ok(sret)) { - LOG(ERROR) << "Checkpoint create error on storage engine: " - << apache::thrift::util::enumNameSafe(nebula::error(sret)); + LOG(INFO) << "Checkpoint create error on storage engine: " + << apache::thrift::util::enumNameSafe(nebula::error(sret)); handleErrorCode(nebula::error(sret)); ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Cancel write blocking error"; + LOG(INFO) << "Cancel write blocking error"; } onFinished(); return; @@ -170,7 +170,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { // step 4 created backup for meta(export sst). auto backupFiles = MetaServiceUtils::backupTables(kvstore_, spaces, backupName, backupSpaces); if (!nebula::ok(backupFiles)) { - LOG(ERROR) << "Failed backup meta"; + LOG(INFO) << "Failed backup meta"; handleErrorCode(nebula::cpp2::ErrorCode::E_BACKUP_FAILED); onFinished(); return; @@ -179,7 +179,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { // step 5 : checkpoint created done, so release the write blocking. ret = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Cancel write blocking error"; + LOG(INFO) << "Cancel write blocking error"; handleErrorCode(ret); onFinished(); return; @@ -192,10 +192,10 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) { MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::VALID, NetworkUtils::toHostsStr(hosts))); putRet = doSyncPut(std::move(data)); if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "All checkpoint creations are done, " - "but update checkpoint status error. " - "backup : " - << backupName; + LOG(INFO) << "All checkpoint creations are done, " + "but update checkpoint status error. " + "backup : " + << backupName; handleErrorCode(putRet); onFinished(); return; diff --git a/src/meta/processors/admin/CreateBackupProcessor.h b/src/meta/processors/admin/CreateBackupProcessor.h index a5df484109e..df75ac036ef 100644 --- a/src/meta/processors/admin/CreateBackupProcessor.h +++ b/src/meta/processors/admin/CreateBackupProcessor.h @@ -14,6 +14,10 @@ namespace nebula { namespace meta { +/** + * @brief Create backup files in each mtead and storaged services' local. + * + */ class CreateBackupProcessor : public BaseProcessor { public: static CreateBackupProcessor* instance(kvstore::KVStore* kvstore, AdminClient* client) { diff --git a/src/meta/processors/admin/CreateSnapshotProcessor.cpp b/src/meta/processors/admin/CreateSnapshotProcessor.cpp index 1b50904cbb1..d38ae92a688 100644 --- a/src/meta/processors/admin/CreateSnapshotProcessor.cpp +++ b/src/meta/processors/admin/CreateSnapshotProcessor.cpp @@ -24,7 +24,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { } if (nebula::value(result)) { - LOG(ERROR) << "Index is rebuilding, not allowed to create snapshot."; + LOG(INFO) << "Index is rebuilding, not allowed to create snapshot."; handleErrorCode(nebula::cpp2::ErrorCode::E_SNAPSHOT_FAILURE); onFinished(); return; @@ -42,7 +42,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { auto hosts = std::move(nebula::value(activeHostsRet)); if (hosts.empty()) { - LOG(ERROR) << "There is no active hosts"; + LOG(INFO) << "There is no active hosts"; handleErrorCode(nebula::cpp2::ErrorCode::E_NO_HOSTS); onFinished(); return; @@ -58,7 +58,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { auto putRet = doSyncPut(std::move(data)); if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Write snapshot meta error"; + LOG(INFO) << "Write snapshot meta error"; handleErrorCode(putRet); onFinished(); return; @@ -67,7 +67,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { // step 2 : Blocking all writes action for storage engines. auto signRet = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_ON); if (signRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Send blocking sign to storage engine error"; + LOG(INFO) << "Send blocking sign to storage engine error"; handleErrorCode(signRet); cancelWriteBlocking(); onFinished(); @@ -77,7 +77,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { // step 3 : Create checkpoint for all storage engines and meta engine. auto csRet = Snapshot::instance(kvstore_, client_)->createSnapshot(snapshot); if (!nebula::ok(csRet)) { - LOG(ERROR) << "Checkpoint create error on storage engine"; + LOG(INFO) << "Checkpoint create error on storage engine"; handleErrorCode(nebula::error(csRet)); cancelWriteBlocking(); onFinished(); @@ -87,7 +87,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { // step 4 : checkpoint created done, so release the write blocking. auto unbRet = cancelWriteBlocking(); if (unbRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Create snapshot failed on meta server" << snapshot; + LOG(INFO) << "Create snapshot failed on meta server" << snapshot; handleErrorCode(unbRet); onFinished(); return; @@ -96,7 +96,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { // step 5 : create checkpoint for meta server. auto meteRet = kvstore_->createCheckpoint(kDefaultSpaceId, snapshot); if (meteRet.isLeftType()) { - LOG(ERROR) << "Create snapshot failed on meta server" << snapshot; + LOG(INFO) << "Create snapshot failed on meta server" << snapshot; handleErrorCode(nebula::cpp2::ErrorCode::E_STORE_FAILURE); onFinished(); return; @@ -109,10 +109,10 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { putRet = doSyncPut(std::move(data)); if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "All checkpoint creations are done, " - "but update checkpoint status error. " - "snapshot : " - << snapshot; + LOG(INFO) << "All checkpoint creations are done, " + "but update checkpoint status error. " + "snapshot : " + << snapshot; handleErrorCode(putRet); } @@ -123,7 +123,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { nebula::cpp2::ErrorCode CreateSnapshotProcessor::cancelWriteBlocking() { auto signRet = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF); if (signRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Cancel write blocking error"; + LOG(INFO) << "Cancel write blocking error"; return signRet; } return nebula::cpp2::ErrorCode::SUCCEEDED; diff --git a/src/meta/processors/admin/CreateSnapshotProcessor.h b/src/meta/processors/admin/CreateSnapshotProcessor.h index 54f1dbd0c3d..d8936ce7b78 100644 --- a/src/meta/processors/admin/CreateSnapshotProcessor.h +++ b/src/meta/processors/admin/CreateSnapshotProcessor.h @@ -14,6 +14,10 @@ namespace nebula { namespace meta { +/** + * @brief Create snapshot for all spaces, will deprecated when backup ready + * + */ class CreateSnapshotProcessor : public BaseProcessor { public: static CreateSnapshotProcessor* instance(kvstore::KVStore* kvstore, AdminClient* client) { @@ -21,6 +25,11 @@ class CreateSnapshotProcessor : public BaseProcessor { } void process(const cpp2::CreateSnapshotReq& req); + /** + * @brief Cancel write blocking when create snapshot failed + * + * @return nebula::cpp2::ErrorCode + */ nebula::cpp2::ErrorCode cancelWriteBlocking(); private: diff --git a/src/meta/processors/admin/DropSnapshotProcessor.cpp b/src/meta/processors/admin/DropSnapshotProcessor.cpp index 140ae8c4bfe..e93e72a61f6 100644 --- a/src/meta/processors/admin/DropSnapshotProcessor.cpp +++ b/src/meta/processors/admin/DropSnapshotProcessor.cpp @@ -25,8 +25,8 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) { onFinished(); return; } - LOG(ERROR) << "Get snapshot " << snapshot << " failed, error " - << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get snapshot " << snapshot << " failed, error " + << apache::thrift::util::enumNameSafe(retCode); handleErrorCode(retCode); onFinished(); return; @@ -36,7 +36,7 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) { auto hosts = MetaKeyUtils::parseSnapshotHosts(val); auto peersRet = NetworkUtils::toHosts(hosts); if (!peersRet.ok()) { - LOG(ERROR) << "Get checkpoint hosts error"; + LOG(INFO) << "Get checkpoint hosts error"; handleErrorCode(nebula::cpp2::ErrorCode::E_SNAPSHOT_FAILURE); onFinished(); return; @@ -46,16 +46,16 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) { auto peers = peersRet.value(); auto dsRet = Snapshot::instance(kvstore_, client_)->dropSnapshot(snapshot, std::move(peers)); if (dsRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Drop snapshot error on storage engine"; + LOG(INFO) << "Drop snapshot error on storage engine"; // Need update the snapshot status to invalid, maybe some storage engine // drop done. data.emplace_back(MetaKeyUtils::snapshotKey(snapshot), MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::INVALID, hosts)); auto putRet = doSyncPut(std::move(data)); if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Update snapshot status error. " - "snapshot : " - << snapshot; + LOG(INFO) << "Update snapshot status error. " + "snapshot : " + << snapshot; } handleErrorCode(putRet); onFinished(); @@ -65,16 +65,16 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) { auto dmRet = kvstore_->dropCheckpoint(kDefaultSpaceId, snapshot); // TODO sky : need remove meta checkpoint from slave hosts. if (dmRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Drop snapshot error on meta engine"; + LOG(INFO) << "Drop snapshot error on meta engine"; // Need update the snapshot status to invalid, maybe storage engines drop // done. data.emplace_back(MetaKeyUtils::snapshotKey(snapshot), MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::INVALID, hosts)); auto putRet = doSyncPut(std::move(data)); if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Update snapshot status error. " - "snapshot : " - << snapshot; + LOG(INFO) << "Update snapshot status error. " + "snapshot : " + << snapshot; } handleErrorCode(putRet); onFinished(); diff --git a/src/meta/processors/admin/DropSnapshotProcessor.h b/src/meta/processors/admin/DropSnapshotProcessor.h index 50e56387ee8..6791635e4ed 100644 --- a/src/meta/processors/admin/DropSnapshotProcessor.h +++ b/src/meta/processors/admin/DropSnapshotProcessor.h @@ -14,6 +14,11 @@ namespace nebula { namespace meta { +/** + * @brief Drop snapshot for all spaces. It could drop snapshots + * created by CreateBackupProcessor or CreateCheckpointProcessor. + * + */ class DropSnapshotProcessor : public BaseProcessor { public: static DropSnapshotProcessor* instance(kvstore::KVStore* kvstore, AdminClient* client) { diff --git a/src/meta/processors/admin/GetMetaDirInfoProcessor.cpp b/src/meta/processors/admin/GetMetaDirInfoProcessor.cpp index 0d96ed730f6..122439b9138 100644 --- a/src/meta/processors/admin/GetMetaDirInfoProcessor.cpp +++ b/src/meta/processors/admin/GetMetaDirInfoProcessor.cpp @@ -19,8 +19,10 @@ void GetMetaDirInfoProcessor::process(const cpp2::GetMetaDirInfoReq& req) { nebula::cpp2::DirInfo dir; dir.data_ref() = datapaths; dir.root_ref() = boost::filesystem::current_path().string(); - resp_.dir_ref() = std::move(dir); + VLOG(1) << "Get meta dir info, data paths size: " << dir.get_data().size() + << ", root path:" << dir.get_root(); + resp_.dir_ref() = std::move(dir); resp_.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED; onFinished(); } diff --git a/src/meta/processors/admin/GetMetaDirInfoProcessor.h b/src/meta/processors/admin/GetMetaDirInfoProcessor.h index 1dcf0f94cbc..bfe129f67dd 100644 --- a/src/meta/processors/admin/GetMetaDirInfoProcessor.h +++ b/src/meta/processors/admin/GetMetaDirInfoProcessor.h @@ -12,6 +12,11 @@ namespace nebula { namespace meta { +/** + * @brief Get meta dir info for each metad service, now only used in BR + * Dir info contains root dir path and kv store data dir path. + * + */ class GetMetaDirInfoProcessor : public BaseProcessor { public: static GetMetaDirInfoProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/processors/admin/HBProcessor.cpp b/src/meta/processors/admin/HBProcessor.cpp index 83c2984f513..db61e2f8bce 100644 --- a/src/meta/processors/admin/HBProcessor.cpp +++ b/src/meta/processors/admin/HBProcessor.cpp @@ -35,7 +35,7 @@ void HBProcessor::process(const cpp2::HBReq& req) { if (role == cpp2::HostRole::STORAGE) { if (!ActiveHostsMan::machineRegisted(kvstore_, host)) { - LOG(ERROR) << "Machine " << host << " is not registed"; + LOG(INFO) << "Machine " << host << " is not registed"; handleErrorCode(nebula::cpp2::ErrorCode::E_MACHINE_NOT_FOUND); onFinished(); return; @@ -47,7 +47,7 @@ void HBProcessor::process(const cpp2::HBReq& req) { LOG(INFO) << "Set clusterId for new host " << host << "!"; resp_.cluster_id_ref() = clusterId_; } else if (peerClusterId != clusterId_) { - LOG(ERROR) << "Reject wrong cluster host " << host << "!"; + LOG(INFO) << "Reject wrong cluster host " << host << "!"; handleErrorCode(nebula::cpp2::ErrorCode::E_WRONGCLUSTER); onFinished(); return; diff --git a/src/meta/processors/admin/HBProcessor.h b/src/meta/processors/admin/HBProcessor.h index 4fde1b03bac..8b8caf5c358 100644 --- a/src/meta/processors/admin/HBProcessor.h +++ b/src/meta/processors/admin/HBProcessor.h @@ -31,6 +31,11 @@ struct HBCounters final { }; extern HBCounters kHBCounters; +/** + * @brief storaged/graphd/listener report info to metad periodically, + * and update host alive info. + * + */ class HBProcessor : public BaseProcessor { FRIEND_TEST(HBProcessorTest, HBTest); FRIEND_TEST(MetaClientTest, HeartbeatTest); diff --git a/src/meta/processors/admin/ListClusterInfoProcessor.cpp b/src/meta/processors/admin/ListClusterInfoProcessor.cpp index 3c291fa9ca8..dbbbcfbe987 100644 --- a/src/meta/processors/admin/ListClusterInfoProcessor.cpp +++ b/src/meta/processors/admin/ListClusterInfoProcessor.cpp @@ -29,8 +29,8 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) { const auto& hostPrefix = MetaKeyUtils::hostPrefix(); auto iterRet = doPrefix(hostPrefix); if (!nebula::ok(iterRet)) { - LOG(ERROR) << "get host prefix failed:" - << apache::thrift::util::enumNameSafe(nebula::error(iterRet)); + LOG(INFO) << "get host prefix failed:" + << apache::thrift::util::enumNameSafe(nebula::error(iterRet)); handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE); onFinished(); return; @@ -49,10 +49,10 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) { auto dirKey = MetaKeyUtils::hostDirKey(addr.host, addr.port); auto dirRet = doGet(dirKey); if (!nebula::ok(dirRet)) { - LOG(ERROR) << folly::sformat("Get host {} dir info for {} failed: {}", - addr.toString(), - apache::thrift::util::enumNameSafe(info.role_), - apache::thrift::util::enumNameSafe(nebula::error(dirRet))); + LOG(INFO) << folly::sformat("Get host {} dir info for {} failed: {}", + addr.toString(), + apache::thrift::util::enumNameSafe(info.role_), + apache::thrift::util::enumNameSafe(nebula::error(dirRet))); handleErrorCode(nebula::error(dirRet)); onFinished(); return; @@ -71,7 +71,7 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) { auto partRet = kvstore_->part(kDefaultSpaceId, kDefaultPartId); if (!nebula::ok(partRet)) { auto code = nebula::error(partRet); - LOG(ERROR) << "get meta part store failed, error: " << apache::thrift::util::enumNameSafe(code); + LOG(INFO) << "get meta part store failed, error: " << apache::thrift::util::enumNameSafe(code); handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE); onFinished(); return; @@ -98,14 +98,14 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) { } } if (agentCount < 1) { - LOG(ERROR) << folly::sformat("There are {} agent count is host {}", agentCount, host); + LOG(INFO) << folly::sformat("There are {} agent count is host {}", agentCount, host); handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE); onFinished(); return; } if (services.size() <= 1) { - LOG(ERROR) << "There is no other service than agent in host: " << host; + LOG(INFO) << "There is no other service than agent in host: " << host; continue; } } diff --git a/src/meta/processors/admin/ListClusterInfoProcessor.h b/src/meta/processors/admin/ListClusterInfoProcessor.h index a62b098905d..9633e7df696 100644 --- a/src/meta/processors/admin/ListClusterInfoProcessor.h +++ b/src/meta/processors/admin/ListClusterInfoProcessor.h @@ -12,6 +12,11 @@ namespace nebula { namespace meta { +/** + * @brief Get cluster topology, grouping all the cluster service(metad/storaged/graphd/agent) + * by hostname(or ip). + * Now it is only used in BR tools. + */ class ListClusterInfoProcessor : public BaseProcessor { public: static ListClusterInfoProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/processors/admin/ListSnapshotsProcessor.cpp b/src/meta/processors/admin/ListSnapshotsProcessor.cpp index c1a14400737..aa4b808360a 100644 --- a/src/meta/processors/admin/ListSnapshotsProcessor.cpp +++ b/src/meta/processors/admin/ListSnapshotsProcessor.cpp @@ -15,7 +15,7 @@ void ListSnapshotsProcessor::process(const cpp2::ListSnapshotsReq&) { auto iterRet = doPrefix(prefix); if (!nebula::ok(iterRet)) { auto retCode = nebula::error(iterRet); - LOG(ERROR) << "Snapshot prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Snapshot prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); handleErrorCode(retCode); onFinished(); return; diff --git a/src/meta/processors/admin/ListSnapshotsProcessor.h b/src/meta/processors/admin/ListSnapshotsProcessor.h index 59097ea905e..19dc70c9cde 100644 --- a/src/meta/processors/admin/ListSnapshotsProcessor.h +++ b/src/meta/processors/admin/ListSnapshotsProcessor.h @@ -11,6 +11,12 @@ namespace nebula { namespace meta { +/** + * @brief List all snapshot info, CreateBackupProcessor + * and CreateCheckpointProcessor will both + * add snapshot info to metad's kv store. + * + */ class ListSnapshotsProcessor : public BaseProcessor { public: static ListSnapshotsProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/processors/admin/RestoreProcessor.cpp b/src/meta/processors/admin/RestoreProcessor.cpp index 3f3c35e6cab..b785a784e0b 100644 --- a/src/meta/processors/admin/RestoreProcessor.cpp +++ b/src/meta/processors/admin/RestoreProcessor.cpp @@ -20,7 +20,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr& auto iterRet = doPrefix(spacePrefix, direct); if (!nebula::ok(iterRet)) { retCode = nebula::error(iterRet); - LOG(ERROR) << "Space prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Space prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } auto iter = nebula::value(iterRet).get(); @@ -40,7 +40,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr& auto iterPartRet = doPrefix(partPrefix, direct); if (!nebula::ok(iterPartRet)) { retCode = nebula::error(iterPartRet); - LOG(ERROR) << "Part prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Part prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } iter = nebula::value(iterPartRet).get(); @@ -64,8 +64,8 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr& if (direct) { retCode = kvstore_->multiPutWithoutReplicator(kDefaultSpaceId, std::move(data)); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "multiPutWithoutReplicator failed, error: " - << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "multiPutWithoutReplicator partition info failed, error: " + << apache::thrift::util::enumNameSafe(retCode); } return retCode; } @@ -90,7 +90,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(const HostAddr& ipv4 auto iterRet = doPrefix(zonePrefix, direct); if (!nebula::ok(iterRet)) { retCode = nebula::error(iterRet); - LOG(ERROR) << "Zone prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Zone prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); return retCode; } auto iter = nebula::value(iterRet).get(); @@ -116,8 +116,8 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(const HostAddr& ipv4 if (direct) { retCode = kvstore_->multiPutWithoutReplicator(kDefaultSpaceId, std::move(data)); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "multiPutWithoutReplicator failed, error: " - << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "multiPutWithoutReplicator zon info failed, error: " + << apache::thrift::util::enumNameSafe(retCode); } return retCode; } @@ -136,7 +136,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(const HostAddr& ipv4 void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) { auto files = req.get_files(); if (files.empty()) { - LOG(ERROR) << "restore must contain the sst file."; + LOG(INFO) << "restore must contain the sst file."; handleErrorCode(nebula::cpp2::ErrorCode::E_RESTORE_FAILURE); onFinished(); return; @@ -144,7 +144,7 @@ void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) { auto ret = kvstore_->restoreFromFiles(kDefaultSpaceId, files); if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Failed to restore file"; + LOG(INFO) << "Failed to restore file"; handleErrorCode(nebula::cpp2::ErrorCode::E_RESTORE_FAILURE); onFinished(); return; @@ -159,7 +159,7 @@ void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) { auto result = replaceHostInPartition(h.get_from_host(), h.get_to_host(), true); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "replaceHost in partition fails when recovered"; + LOG(INFO) << "replaceHost in partition fails when recovered"; handleErrorCode(result); onFinished(); return; @@ -167,11 +167,13 @@ void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) { result = replaceHostInZone(h.get_from_host(), h.get_to_host(), true); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "replaceHost in zone fails when recovered"; + LOG(INFO) << "replaceHost in zone fails when recovered"; handleErrorCode(result); onFinished(); return; } + + // TODO(spw): need to replace the registered machine } } diff --git a/src/meta/processors/admin/RestoreProcessor.h b/src/meta/processors/admin/RestoreProcessor.h index c9b10ffbd80..b980ffd7f79 100644 --- a/src/meta/processors/admin/RestoreProcessor.h +++ b/src/meta/processors/admin/RestoreProcessor.h @@ -13,6 +13,11 @@ namespace nebula { namespace meta { +/** + * @brief Rebuild the host relative info after ingesting the table backup data to + * the new cluster metad KV store. + * + */ class RestoreProcessor : public BaseProcessor { public: static RestoreProcessor* instance(kvstore::KVStore* kvstore) { diff --git a/src/meta/processors/admin/SnapShot.cpp b/src/meta/processors/admin/SnapShot.cpp index 49452bd83d5..0f71c257ad3 100644 --- a/src/meta/processors/admin/SnapShot.cpp +++ b/src/meta/processors/admin/SnapShot.cpp @@ -85,9 +85,8 @@ nebula::cpp2::ErrorCode Snapshot::dropSnapshot(const std::string& name, auto status = client_->dropSnapshot(spaces, name, host).get(); if (!status.ok()) { auto msg = "failed drop checkpoint : \"%s\". on host %s. error %s"; - auto error = folly::stringPrintf( + LOG(INFO) << folly::stringPrintf( msg, name.c_str(), host.toString().c_str(), status.toString().c_str()); - LOG(ERROR) << error; } } return nebula::cpp2::ErrorCode::SUCCEEDED; @@ -109,8 +108,8 @@ nebula::cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType s LOG(INFO) << "will block write host: " << host; auto status = client_->blockingWrites(spaces, sign, host).get(); if (!status.ok()) { - LOG(ERROR) << "Send blocking sign error on host " << host - << ", errorcode: " << status.message(); + LOG(INFO) << "Send blocking sign error on host " << host + << ", errorcode: " << status.message(); ret = nebula::cpp2::ErrorCode::E_BLOCK_WRITE_FAILURE; if (sign == storage::cpp2::EngineSignType::BLOCK_ON) { break; @@ -127,8 +126,8 @@ Snapshot::getHostSpaces() { std::unique_ptr iter; auto retCode = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter); if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { - LOG(ERROR) << "Get hosts meta data failed, error: " - << apache::thrift::util::enumNameSafe(retCode); + LOG(INFO) << "Get hosts meta data failed, error: " + << apache::thrift::util::enumNameSafe(retCode); return retCode; } diff --git a/src/meta/processors/admin/SnapShot.h b/src/meta/processors/admin/SnapShot.h index 0c1d9e7b124..b6c129f9742 100644 --- a/src/meta/processors/admin/SnapShot.h +++ b/src/meta/processors/admin/SnapShot.h @@ -17,6 +17,12 @@ namespace nebula { namespace meta { +/** + * @brief Create and drop snapshots for given spaces in + * storage hosts(include followers) + * Another feature is providing blocking/unbloking writings. + * + */ class Snapshot { public: static Snapshot* instance(kvstore::KVStore* kv, AdminClient* client) { @@ -34,8 +40,22 @@ class Snapshot { std::unordered_map>> createSnapshot(const std::string& name); + /** + * @brief Drop specified snapshot in given storage hosts + * + * @param name snapshot name + * @param hosts storage hosts + * @return nebula::cpp2::ErrorCode + */ nebula::cpp2::ErrorCode dropSnapshot(const std::string& name, const std::vector& hosts); + /** + * @brief Blocking writings before create snapshot, allow writings after create + * snapshot failed or completely + * + * @param sign BLOCK_ON and BLOCK_OFF(allow) + * @return nebula::cpp2::ErrorCode + */ nebula::cpp2::ErrorCode blockingWrites(storage::cpp2::EngineSignType sign); private: