Skip to content

Commit

Permalink
fix br bugs in cluster mode
Browse files Browse the repository at this point in the history
  • Loading branch information
pengweisong committed Dec 30, 2021
1 parent 3e71921 commit 12c49b0
Show file tree
Hide file tree
Showing 19 changed files with 48 additions and 20 deletions.
1 change: 1 addition & 0 deletions conf/nebula-metad.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@
--default_replica_factor=1

--heartbeat_interval_secs=10
--agent_heartbeat_interval_secs=60
1 change: 1 addition & 0 deletions conf/nebula-metad.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
--default_replica_factor=1

--heartbeat_interval_secs=10
--agent_heartbeat_interval_secs=60

############## rocksdb Options ##############
--rocksdb_wal_sync=true
2 changes: 2 additions & 0 deletions resources/gflags.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"minloglevel",
"v",
"heartbeat_interval_secs",
"agent_heartbeat_interval_secs",
"meta_client_retry_times",
"slow_op_threshold_ms",
"clean_wal_interval_secs",
Expand All @@ -23,3 +24,4 @@
"rocksdb_block_based_table_options"
]
}

5 changes: 3 additions & 2 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ DECLARE_int32(ws_meta_h2_port);

DEFINE_uint32(expired_time_factor, 5, "The factor of expired time based on heart beat interval");
DEFINE_int32(heartbeat_interval_secs, 10, "Heartbeat interval in seconds");
DEFINE_int32(agent_heartbeat_interval_secs, 60, "Agent heartbeat interval in seconds");
DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no retry");
DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry");
DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout");
Expand Down Expand Up @@ -873,8 +874,8 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("list cluster failure!");
case nebula::cpp2::ErrorCode::E_LIST_CLUSTER_GET_ABS_PATH_FAILURE:
return Status::Error("Failed to get the absolute path!");
case nebula::cpp2::ErrorCode::E_GET_META_DIR_FAILURE:
return Status::Error("Failed to get meta dir!");
case nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE:
return Status::Error("There is no agent!");
case nebula::cpp2::ErrorCode::E_INVALID_JOB:
return Status::Error("No valid job!");
case nebula::cpp2::ErrorCode::E_JOB_NOT_IN_SPACE:
Expand Down
2 changes: 1 addition & 1 deletion src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@
/* ListClusterInfo Failure */ \
X(E_LIST_CLUSTER_FAILURE, -2070) \
X(E_LIST_CLUSTER_GET_ABS_PATH_FAILURE, -2071) \
X(E_GET_META_DIR_FAILURE, -2072) \
X(E_LIST_CLUSTER_NO_AGENT_FAILURE, -2072) \
\
X(E_QUERY_NOT_FOUND, -2073) \
X(E_AGENT_HB_FAILUE, -2074) \
Expand Down
1 change: 1 addition & 0 deletions src/common/meta/GflagsManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ std::unordered_map<std::string, std::pair<cpp2::ConfigMode, bool>> GflagsManager
{"minloglevel", {cpp2::ConfigMode::MUTABLE, false}},
{"v", {cpp2::ConfigMode::MUTABLE, false}},
{"heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"agent_heartbeat_interval_secs", {cpp2::ConfigMode::MUTABLE, false}},
{"meta_client_retry_times", {cpp2::ConfigMode::MUTABLE, false}},
{"slow_op_threshold_ms", {cpp2::ConfigMode::MUTABLE, false}},
{"wal_ttl", {cpp2::ConfigMode::MUTABLE, false}},
Expand Down
2 changes: 1 addition & 1 deletion src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ static const std::unordered_map<std::string, std::pair<std::string, bool>> syste
{"users", {"__users__", true}},
{"hosts", {"__hosts__", false}},
{"versions", {"__versions__", false}},
{"machines", {"__machines__", false}},
{"machines", {"__machines__", true}},
{"host_dirs", {"__host_dirs__", false}},
{"snapshots", {"__snapshots__", false}},
{"configs", {"__configs__", true}},
Expand Down
2 changes: 1 addition & 1 deletion src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class DropHosts final : public SingleDependencyNode {
};

class ShowHosts final : public SingleDependencyNode {
// TODO(shylock) meta/storage/graph enumerate
// TODO(shylock) meta/storage/graph/agent enumerate
public:
static ShowHosts* make(QueryContext* qctx, PlanNode* dep, meta::cpp2::ListHostType type) {
return qctx->objPool()->add(new ShowHosts(qctx, dep, type));
Expand Down
2 changes: 1 addition & 1 deletion src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ enum ErrorCode {
// ListClusterInfo Failure
E_LIST_CLUSTER_FAILURE = -2070,
E_LIST_CLUSTER_GET_ABS_PATH_FAILURE = -2071,
E_GET_META_DIR_FAILURE = -2072,
E_LIST_CLUSTER_NO_AGENT_FAILURE = -2072,

E_QUERY_NOT_FOUND = -2073,
E_AGENT_HB_FAILUE = -2074,
Expand Down
1 change: 1 addition & 0 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ enum ListHostType {
GRAPH = 0x01,
META = 0x02,
STORAGE = 0x03,
AGENT = 0x04,
} (cpp.enum_strict)

struct ListHostsReq {
Expand Down
10 changes: 7 additions & 3 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "meta/processors/Common.h"

DECLARE_int32(heartbeat_interval_secs);
DECLARE_int32(agent_heartbeat_interval_secs);
DECLARE_uint32(expired_time_factor);

namespace nebula {
Expand Down Expand Up @@ -144,9 +145,12 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<HostAddr>> ActiveHostsMan::getActiv
}

std::vector<HostAddr> hosts;
int64_t threshold =
(expiredTTL == 0 ? FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor : expiredTTL) *
1000;
int64_t expiredTime =
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor; // meta/storage/graph
if (role == cpp2::HostRole::AGENT) {
expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor;
}
int64_t threshold = (expiredTTL == 0 ? expiredTime : expiredTTL) * 1000;
auto now = time::WallClock::fastNowInMilliSec();
while (iter->valid()) {
auto host = MetaKeyUtils::parseHostKey(iter->key());
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ void BaseProcessor<RESP>::doPut(std::vector<kvstore::KV> data) {

template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, std::unique_ptr<kvstore::KVIterator>>
BaseProcessor<RESP>::doPrefix(const std::string& key) {
BaseProcessor<RESP>::doPrefix(const std::string& key, bool canReadFromFollower) {
std::unique_ptr<kvstore::KVIterator> iter;
auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, key, &iter);
auto code = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, key, &iter, canReadFromFollower);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
VLOG(2) << "Prefix Failed";
return code;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class BaseProcessor {
void doPut(std::vector<kvstore::KV> data);

ErrorOr<nebula::cpp2::ErrorCode, std::unique_ptr<kvstore::KVIterator>> doPrefix(
const std::string& key);
const std::string& key, bool canReadFromFollower = false);

/**
* General get function.
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/admin/ListClusterInfoProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) {
agentCount++;
}
}
if (agentCount != 1) {
if (agentCount < 1) {
LOG(ERROR) << folly::sformat("There are {} agent count is host {}", agentCount, host);
handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE);
handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_NO_AGENT_FAILURE);
onFinished();
return;
}
Expand Down
10 changes: 7 additions & 3 deletions src/meta/processors/admin/RestoreProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr&
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED;
const auto& spacePrefix = MetaKeyUtils::spacePrefix();
auto iterRet = doPrefix(spacePrefix);
auto iterRet = doPrefix(spacePrefix, direct);
if (!nebula::ok(iterRet)) {
retCode = nebula::error(iterRet);
LOG(ERROR) << "Space prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode);
Expand All @@ -37,7 +37,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr&

for (const auto& spaceId : allSpaceId) {
const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId);
auto iterPartRet = doPrefix(partPrefix);
auto iterPartRet = doPrefix(partPrefix, direct);
if (!nebula::ok(iterPartRet)) {
retCode = nebula::error(iterPartRet);
LOG(ERROR) << "Part prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode);
Expand Down Expand Up @@ -87,7 +87,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(const HostAddr& ipv4
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED;
const auto& zonePrefix = MetaKeyUtils::zonePrefix();
auto iterRet = doPrefix(zonePrefix);
auto iterRet = doPrefix(zonePrefix, direct);
if (!nebula::ok(iterRet)) {
retCode = nebula::error(iterRet);
LOG(ERROR) << "Zone prefix failed, error: " << apache::thrift::util::enumNameSafe(retCode);
Expand Down Expand Up @@ -153,6 +153,10 @@ void RestoreProcessor::process(const cpp2::RestoreMetaReq& req) {
auto replaceHosts = req.get_hosts();
if (!replaceHosts.empty()) {
for (auto h : replaceHosts) {
if (h.get_from_host() == h.get_to_host()) {
continue;
}

auto result = replaceHostInPartition(h.get_from_host(), h.get_to_host(), true);
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "replaceHost in partition fails when recovered";
Expand Down
11 changes: 9 additions & 2 deletions src/meta/processors/parts/ListHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "version/Version.h"

DECLARE_int32(heartbeat_interval_secs);
DECLARE_int32(agent_heartbeat_interval_secs);
DECLARE_uint32(expired_time_factor);
DEFINE_int32(removed_threshold_sec,
24 * 60 * 60,
Expand All @@ -26,6 +27,8 @@ static cpp2::HostRole toHostRole(cpp2::ListHostType type) {
return cpp2::HostRole::META;
case cpp2::ListHostType::STORAGE:
return cpp2::HostRole::STORAGE;
case cpp2::ListHostType::AGENT:
return cpp2::HostRole::AGENT;
default:
return cpp2::HostRole::UNKNOWN;
}
Expand Down Expand Up @@ -133,10 +136,14 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allHostsWithStatus(cpp2::HostRole ro
}

if (now - info.lastHBTimeInMilliSec_ < FLAGS_removed_threshold_sec * 1000) {
int64_t expiredTime =
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000; // meta/storage/graph
if (info.role_ == cpp2::HostRole::AGENT) {
expiredTime = FLAGS_agent_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000;
}
// If meta didn't receive heartbeat with 2 periods, regard hosts as
// offline. Same as ActiveHostsMan::getActiveHosts
if (now - info.lastHBTimeInMilliSec_ <
FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor * 1000) {
if (now - info.lastHBTimeInMilliSec_ < expiredTime) {
item.status_ref() = cpp2::HostStatus::ONLINE;
} else {
item.status_ref() = cpp2::HostStatus::OFFLINE;
Expand Down
4 changes: 3 additions & 1 deletion src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ static constexpr size_t kCommentLengthLimit = 256;
%token KW_DROP KW_REMOVE KW_SPACES KW_INGEST KW_INDEX KW_INDEXES
%token KW_IF KW_NOT KW_EXISTS KW_WITH
%token KW_BY KW_DOWNLOAD KW_HDFS KW_UUID KW_CONFIGS KW_FORCE
%token KW_GET KW_DECLARE KW_GRAPH KW_META KW_STORAGE
%token KW_GET KW_DECLARE KW_GRAPH KW_META KW_STORAGE KW_AGENT
%token KW_TTL KW_TTL_DURATION KW_TTL_COL KW_DATA KW_STOP
%token KW_FETCH KW_PROP KW_UPDATE KW_UPSERT KW_WHEN
%token KW_ORDER KW_ASC KW_LIMIT KW_SAMPLE KW_OFFSET KW_ASCENDING KW_DESCENDING
Expand Down Expand Up @@ -487,6 +487,7 @@ unreserved_keyword
| KW_GRAPH { $$ = new std::string("graph"); }
| KW_META { $$ = new std::string("meta"); }
| KW_STORAGE { $$ = new std::string("storage"); }
| KW_AGENT { $$ = new std::string("agent"); }
| KW_ALL { $$ = new std::string("all"); }
| KW_ANY { $$ = new std::string("any"); }
| KW_SINGLE { $$ = new std::string("single"); }
Expand Down Expand Up @@ -3443,6 +3444,7 @@ list_host_type
: KW_GRAPH { $$ = meta::cpp2::ListHostType::GRAPH; }
| KW_META { $$ = meta::cpp2::ListHostType::META; }
| KW_STORAGE { $$ = meta::cpp2::ListHostType::STORAGE; }
| KW_AGENT { $$ = meta::cpp2::ListHostType::AGENT; }
;

config_module_enum
Expand Down
1 change: 1 addition & 0 deletions src/parser/scanner.lex
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}*
"TTL_COL" { return TokenType::KW_TTL_COL; }
"GRAPH" { return TokenType::KW_GRAPH; }
"META" { return TokenType::KW_META; }
"AGENT" { return TokenType::KW_AGENT; }
"STORAGE" { return TokenType::KW_STORAGE; }
"SHORTEST" { return TokenType::KW_SHORTEST; }
"NOLOOP" { return TokenType::KW_NOLOOP; }
Expand Down
3 changes: 3 additions & 0 deletions src/parser/test/ScannerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,9 @@ TEST(Scanner, Basic) {
CHECK_SEMANTIC_TYPE("META", TokenType::KW_META),
CHECK_SEMANTIC_TYPE("Meta", TokenType::KW_META),
CHECK_SEMANTIC_TYPE("meta", TokenType::KW_META),
CHECK_SEMANTIC_TYPE("AGENT", TokenType::KW_AGENT),
CHECK_SEMANTIC_TYPE("Agent", TokenType::KW_AGENT),
CHECK_SEMANTIC_TYPE("agent", TokenType::KW_AGENT),
CHECK_SEMANTIC_TYPE("STORAGE", TokenType::KW_STORAGE),
CHECK_SEMANTIC_TYPE("Storage", TokenType::KW_STORAGE),
CHECK_SEMANTIC_TYPE("storage", TokenType::KW_STORAGE),
Expand Down

0 comments on commit 12c49b0

Please sign in to comment.