From 12c49b0207650468818fed2dcc39f5c2310fe684 Mon Sep 17 00:00:00 2001 From: pengweisong <90180021+pengweisong@users.noreply.github.com> Date: Wed, 29 Dec 2021 21:19:13 +0800 Subject: [PATCH] fix br bugs in cluster mode --- conf/nebula-metad.conf.default | 1 + conf/nebula-metad.conf.production | 1 + resources/gflags.json | 2 ++ src/clients/meta/MetaClient.cpp | 5 +++-- src/common/graph/Response.h | 2 +- src/common/meta/GflagsManager.cpp | 1 + src/common/utils/MetaKeyUtils.cpp | 2 +- src/graph/planner/plan/Admin.h | 2 +- src/interface/common.thrift | 2 +- src/interface/meta.thrift | 1 + src/meta/ActiveHostsMan.cpp | 10 +++++++--- src/meta/processors/BaseProcessor-inl.h | 4 ++-- src/meta/processors/BaseProcessor.h | 2 +- .../processors/admin/ListClusterInfoProcessor.cpp | 4 ++-- src/meta/processors/admin/RestoreProcessor.cpp | 10 +++++++--- src/meta/processors/parts/ListHostsProcessor.cpp | 11 +++++++++-- src/parser/parser.yy | 4 +++- src/parser/scanner.lex | 1 + src/parser/test/ScannerTest.cpp | 3 +++ 19 files changed, 48 insertions(+), 20 deletions(-) diff --git a/conf/nebula-metad.conf.default b/conf/nebula-metad.conf.default index 89e537b9a57..42966116ef4 100644 --- a/conf/nebula-metad.conf.default +++ b/conf/nebula-metad.conf.default @@ -52,3 +52,4 @@ --default_replica_factor=1 --heartbeat_interval_secs=10 +--agent_heartbeat_interval_secs=60 diff --git a/conf/nebula-metad.conf.production b/conf/nebula-metad.conf.production index 870ba6318c2..1aa2d116211 100644 --- a/conf/nebula-metad.conf.production +++ b/conf/nebula-metad.conf.production @@ -52,6 +52,7 @@ --default_replica_factor=1 --heartbeat_interval_secs=10 +--agent_heartbeat_interval_secs=60 ############## rocksdb Options ############## --rocksdb_wal_sync=true diff --git a/resources/gflags.json b/resources/gflags.json index e75f960e71a..e0f14e6a8fa 100644 --- a/resources/gflags.json +++ b/resources/gflags.json @@ -3,6 +3,7 @@ "minloglevel", "v", "heartbeat_interval_secs", + "agent_heartbeat_interval_secs", "meta_client_retry_times", "slow_op_threshold_ms", "clean_wal_interval_secs", @@ -23,3 +24,4 @@ "rocksdb_block_based_table_options" ] } + diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 5d03a17f95c..8cafe3d59e1 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -32,6 +32,7 @@ DECLARE_int32(ws_meta_h2_port); DEFINE_uint32(expired_time_factor, 5, "The factor of expired time based on heart beat interval"); DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval in seconds"); +DEFINE_int32(agent_heartbeat_interval_secs, 60, "Agent heartbeat interval in seconds"); DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry"); DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry"); DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout"); @@ -873,8 +874,8 @@ Status MetaClient::handleResponse(const RESP& resp) { return Status::Error("list cluster failure!"); case nebula::cpp2::ErrorCode::E_LIST_CLUSTER_GET_ABS_PATH_FAILURE: return Status::Error("Failed to get the absolute path!"); - case nebula::cpp2::ErrorCode::E_GET_META_DIR_FAILURE: - return Status::Error("Failed to get meta dir!"); + case nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE: + return Status::Error("There is no agent!"); case nebula::cpp2::ErrorCode::E_INVALID_JOB: return Status::Error("No valid job!"); case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE: diff --git a/src/common/graph/Response.h b/src/common/graph/Response.h index d1d6594b235..1905a6d0f77 100644 --- a/src/common/graph/Response.h +++ b/src/common/graph/Response.h @@ -117,7 +117,7 @@ /* ListClusterInfo Failure */ \ X(E_LIST_CLUSTER_FAILURE, -2070) \ X(E_LIST_CLUSTER_GET_ABS_PATH_FAILURE, -2071) \ - X(E_GET_META_DIR_FAILURE, -2072) \ + X(E_LIST_CLUSTER_NO_AGENT_FAILURE, -2072) \ \ X(E_QUERY_NOT_FOUND, -2073) \ X(E_AGENT_HB_FAILUE, -2074) \ diff --git a/src/common/meta/GflagsManager.cpp b/src/common/meta/GflagsManager.cpp index 0588d45cde6..4661dabef95 100644 --- a/src/common/meta/GflagsManager.cpp +++ b/src/common/meta/GflagsManager.cpp @@ -53,6 +53,7 @@ std::unordered_map> GflagsManager {"minloglevel", {cpp2::ConfigMode::MUTABLE, false}}, {"v", {cpp2::ConfigMode::MUTABLE, false}}, {"heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, + {"agent_heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}}, {"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}}, {"slow_op_threshold_ms", {cpp2::ConfigMode::MUTABLE, false}}, {"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}}, diff --git a/src/common/utils/MetaKeyUtils.cpp b/src/common/utils/MetaKeyUtils.cpp index 603cf9e228b..294374b2222 100644 --- a/src/common/utils/MetaKeyUtils.cpp +++ b/src/common/utils/MetaKeyUtils.cpp @@ -21,7 +21,7 @@ static const std::unordered_map> syste {"users", {"__users__", true}}, {"hosts", {"__hosts__", false}}, {"versions", {"__versions__", false}}, - {"machines", {"__machines__", false}}, + {"machines", {"__machines__", true}}, {"host_dirs", {"__host_dirs__", false}}, {"snapshots", {"__snapshots__", false}}, {"configs", {"__configs__", true}}, diff --git a/src/graph/planner/plan/Admin.h b/src/graph/planner/plan/Admin.h index 0e6536587e9..3785031a7c0 100644 --- a/src/graph/planner/plan/Admin.h +++ b/src/graph/planner/plan/Admin.h @@ -92,7 +92,7 @@ class DropHosts final : public SingleDependencyNode { }; class ShowHosts final : public SingleDependencyNode { - // TODO(shylock) meta/storage/graph enumerate + // TODO(shylock) meta/storage/graph/agent enumerate public: static ShowHosts* make(QueryContext* qctx, PlanNode* dep, meta::cpp2::ListHostType type) { return qctx->objPool()->add(new ShowHosts(qctx, dep, type)); diff --git a/src/interface/common.thrift b/src/interface/common.thrift index 22b1b8fcb17..14dd6e96531 100644 --- a/src/interface/common.thrift +++ b/src/interface/common.thrift @@ -402,7 +402,7 @@ enum ErrorCode { // ListClusterInfo Failure E_LIST_CLUSTER_FAILURE = -2070, E_LIST_CLUSTER_GET_ABS_PATH_FAILURE = -2071, - E_GET_META_DIR_FAILURE = -2072, + E_LIST_CLUSTER_NO_AGENT_FAILURE = -2072, E_QUERY_NOT_FOUND = -2073, E_AGENT_HB_FAILUE = -2074, diff --git a/src/interface/meta.thrift b/src/interface/meta.thrift index c7291171200..10ac522975c 100644 --- a/src/interface/meta.thrift +++ b/src/interface/meta.thrift @@ -463,6 +463,7 @@ enum ListHostType { GRAPH = 0x01, META = 0x02, STORAGE = 0x03, + AGENT = 0x04, } (cpp.enum_strict) struct ListHostsReq { diff --git a/src/meta/ActiveHostsMan.cpp b/src/meta/ActiveHostsMan.cpp index af94e214051..f1e4dcfc158 100644 --- a/src/meta/ActiveHostsMan.cpp +++ b/src/meta/ActiveHostsMan.cpp @@ -12,6 +12,7 @@ #include "meta/processors/Common.h" DECLARE_int32(heartbeat_interval_secs); +DECLARE_int32(agent_heartbeat_interval_secs); DECLARE_uint32(expired_time_factor); namespace nebula { @@ -144,9 +145,12 @@ ErrorOr> ActiveHostsMan::getActiv } std::vector hosts; - int64_t threshold = - (expiredTTL == 0 ? FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor : expiredTTL) * - 1000; + int64_t expiredTime = + FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor; // meta/storage/graph + if (role == cpp2::HostRole::AGENT) { + expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor; + } + int64_t threshold = (expiredTTL == 0 ? expiredTime : expiredTTL) * 1000; auto now = time::WallClock::fastNowInMilliSec(); while (iter->valid()) { auto host = MetaKeyUtils::parseHostKey(iter->key()); diff --git a/src/meta/processors/BaseProcessor-inl.h b/src/meta/processors/BaseProcessor-inl.h index 27dcaea01b3..bfb203d89d6 100644 --- a/src/meta/processors/BaseProcessor-inl.h +++ b/src/meta/processors/BaseProcessor-inl.h @@ -26,9 +26,9 @@ void BaseProcessor::doPut(std::vector data) { template ErrorOr> -BaseProcessor::doPrefix(const std::string& key) { +BaseProcessor::doPrefix(const std::string& key, bool canReadFromFollower) { std::unique_ptr iter; - auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, key, &iter); + auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, key, &iter, canReadFromFollower); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { VLOG(2) << "Prefix Failed"; return code; diff --git a/src/meta/processors/BaseProcessor.h b/src/meta/processors/BaseProcessor.h index 5d8c1521f80..f9def2c631e 100644 --- a/src/meta/processors/BaseProcessor.h +++ b/src/meta/processors/BaseProcessor.h @@ -125,7 +125,7 @@ class BaseProcessor { void doPut(std::vector data); ErrorOr> doPrefix( - const std::string& key); + const std::string& key, bool canReadFromFollower = false); /** * General get function. diff --git a/src/meta/processors/admin/ListClusterInfoProcessor.cpp b/src/meta/processors/admin/ListClusterInfoProcessor.cpp index ce5215e2f98..3ebd993d35f 100644 --- a/src/meta/processors/admin/ListClusterInfoProcessor.cpp +++ b/src/meta/processors/admin/ListClusterInfoProcessor.cpp @@ -97,9 +97,9 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) { agentCount++; } } - if (agentCount != 1) { + if (agentCount < 1) { LOG(ERROR) << folly::sformat("There are {} agent count is host {}", agentCount, host); - handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE); + handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE); onFinished(); return; } diff --git a/src/meta/processors/admin/RestoreProcessor.cpp b/src/meta/processors/admin/RestoreProcessor.cpp index ce7748ff9c3..3f3c35e6cab 100644 --- a/src/meta/processors/admin/RestoreProcessor.cpp +++ b/src/meta/processors/admin/RestoreProcessor.cpp @@ -17,7 +17,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr& folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED; const auto& spacePrefix = MetaKeyUtils::spacePrefix(); - auto iterRet = doPrefix(spacePrefix); + auto iterRet = doPrefix(spacePrefix, direct); if (!nebula::ok(iterRet)) { retCode = nebula::error(iterRet); LOG(ERROR) << "Space prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); @@ -37,7 +37,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr& for (const auto& spaceId : allSpaceId) { const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId); - auto iterPartRet = doPrefix(partPrefix); + auto iterPartRet = doPrefix(partPrefix, direct); if (!nebula::ok(iterPartRet)) { retCode = nebula::error(iterPartRet); LOG(ERROR) << "Part prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); @@ -87,7 +87,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(const HostAddr& ipv4 folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock()); auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED; const auto& zonePrefix = MetaKeyUtils::zonePrefix(); - auto iterRet = doPrefix(zonePrefix); + auto iterRet = doPrefix(zonePrefix, direct); if (!nebula::ok(iterRet)) { retCode = nebula::error(iterRet); LOG(ERROR) << "Zone prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode); @@ -153,6 +153,10 @@ void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) { auto replaceHosts = req.get_hosts(); if (!replaceHosts.empty()) { for (auto h : replaceHosts) { + if (h.get_from_host() == h.get_to_host()) { + continue; + } + auto result = replaceHostInPartition(h.get_from_host(), h.get_to_host(), true); if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "replaceHost in partition fails when recovered"; diff --git a/src/meta/processors/parts/ListHostsProcessor.cpp b/src/meta/processors/parts/ListHostsProcessor.cpp index 23327a00fab..3495ee62628 100644 --- a/src/meta/processors/parts/ListHostsProcessor.cpp +++ b/src/meta/processors/parts/ListHostsProcessor.cpp @@ -10,6 +10,7 @@ #include "version/Version.h" DECLARE_int32(heartbeat_interval_secs); +DECLARE_int32(agent_heartbeat_interval_secs); DECLARE_uint32(expired_time_factor); DEFINE_int32(removed_threshold_sec, 24 * 60 * 60, @@ -26,6 +27,8 @@ static cpp2::HostRole toHostRole(cpp2::ListHostType type) { return cpp2::HostRole::META; case cpp2::ListHostType::STORAGE: return cpp2::HostRole::STORAGE; + case cpp2::ListHostType::AGENT: + return cpp2::HostRole::AGENT; default: return cpp2::HostRole::UNKNOWN; } @@ -133,10 +136,14 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro } if (now - info.lastHBTimeInMilliSec_ < FLAGS_removed_threshold_sec * 1000) { + int64_t expiredTime = + FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000; // meta/storage/graph + if (info.role_ == cpp2::HostRole::AGENT) { + expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000; + } // If meta didn't receive heartbeat with 2 periods, regard hosts as // offline. Same as ActiveHostsMan::getActiveHosts - if (now - info.lastHBTimeInMilliSec_ < - FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000) { + if (now - info.lastHBTimeInMilliSec_ < expiredTime) { item.status_ref() = cpp2::HostStatus::ONLINE; } else { item.status_ref() = cpp2::HostStatus::OFFLINE; diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 9b5f730b8b2..51732359e69 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -177,7 +177,7 @@ static constexpr size_t kCommentLengthLimit = 256; %token KW_DROP KW_REMOVE KW_SPACES KW_INGEST KW_INDEX KW_INDEXES %token KW_IF KW_NOT KW_EXISTS KW_WITH %token KW_BY KW_DOWNLOAD KW_HDFS KW_UUID KW_CONFIGS KW_FORCE -%token KW_GET KW_DECLARE KW_GRAPH KW_META KW_STORAGE +%token KW_GET KW_DECLARE KW_GRAPH KW_META KW_STORAGE KW_AGENT %token KW_TTL KW_TTL_DURATION KW_TTL_COL KW_DATA KW_STOP %token KW_FETCH KW_PROP KW_UPDATE KW_UPSERT KW_WHEN %token KW_ORDER KW_ASC KW_LIMIT KW_SAMPLE KW_OFFSET KW_ASCENDING KW_DESCENDING @@ -487,6 +487,7 @@ unreserved_keyword | KW_GRAPH { $$ = new std::string("graph"); } | KW_META { $$ = new std::string("meta"); } | KW_STORAGE { $$ = new std::string("storage"); } + | KW_AGENT { $$ = new std::string("agent"); } | KW_ALL { $$ = new std::string("all"); } | KW_ANY { $$ = new std::string("any"); } | KW_SINGLE { $$ = new std::string("single"); } @@ -3443,6 +3444,7 @@ list_host_type : KW_GRAPH { $$ = meta::cpp2::ListHostType::GRAPH; } | KW_META { $$ = meta::cpp2::ListHostType::META; } | KW_STORAGE { $$ = meta::cpp2::ListHostType::STORAGE; } + | KW_AGENT { $$ = meta::cpp2::ListHostType::AGENT; } ; config_module_enum diff --git a/src/parser/scanner.lex b/src/parser/scanner.lex index 451a6a63f8a..af7b2ad9f58 100644 --- a/src/parser/scanner.lex +++ b/src/parser/scanner.lex @@ -220,6 +220,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}* "TTL_COL" { return TokenType::KW_TTL_COL; } "GRAPH" { return TokenType::KW_GRAPH; } "META" { return TokenType::KW_META; } +"AGENT" { return TokenType::KW_AGENT; } "STORAGE" { return TokenType::KW_STORAGE; } "SHORTEST" { return TokenType::KW_SHORTEST; } "NOLOOP" { return TokenType::KW_NOLOOP; } diff --git a/src/parser/test/ScannerTest.cpp b/src/parser/test/ScannerTest.cpp index 16b8764cce2..830e9c3bb69 100644 --- a/src/parser/test/ScannerTest.cpp +++ b/src/parser/test/ScannerTest.cpp @@ -431,6 +431,9 @@ TEST(Scanner, Basic) { CHECK_SEMANTIC_TYPE("META", TokenType::KW_META), CHECK_SEMANTIC_TYPE("Meta", TokenType::KW_META), CHECK_SEMANTIC_TYPE("meta", TokenType::KW_META), + CHECK_SEMANTIC_TYPE("AGENT", TokenType::KW_AGENT), + CHECK_SEMANTIC_TYPE("Agent", TokenType::KW_AGENT), + CHECK_SEMANTIC_TYPE("agent", TokenType::KW_AGENT), CHECK_SEMANTIC_TYPE("STORAGE", TokenType::KW_STORAGE), CHECK_SEMANTIC_TYPE("Storage", TokenType::KW_STORAGE), CHECK_SEMANTIC_TYPE("storage", TokenType::KW_STORAGE),