Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick v3.1.0 (0414-0420) #4183

Merged
merged 9 commits into from
Apr 20, 2022
53 changes: 47 additions & 6 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,21 @@ bool MetaClient::waitForMetadReady(int count, int retryIntervalSecs) {
LOG(ERROR) << "Connect to the MetaServer Failed";
return false;
}

// Verify the graph server version
auto status = verifyVersion();
if (!status.ok()) {
LOG(ERROR) << status;
return false;
}

// Save graph version to meta
status = saveVersionToMeta();
if (!status.ok()) {
LOG(ERROR) << status;
return false;
}

CHECK(bgThread_->start());
LOG(INFO) << "Register time task for heartbeat!";
size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900);
Expand All @@ -161,10 +170,14 @@ void MetaClient::heartBeatThreadFunc() {
bgThread_->addDelayTask(
FLAGS_heartbeat_interval_secs * 1000, &MetaClient::heartBeatThreadFunc, this);
};
auto ret = heartbeat().get();
if (!ret.ok()) {
LOG(ERROR) << "Heartbeat failed, status:" << ret.status();
return;
// UNKNOWN is reserved for tools such as upgrader, in that case the ip/port is not set. We do
// not send heartbeat to meta to avoid writing error host info (e.g. Host("", 0))
if (options_.role_ != cpp2::HostRole::UNKNOWN) {
auto ret = heartbeat().get();
if (!ret.ok()) {
LOG(ERROR) << "Heartbeat failed, status:" << ret.status();
return;
}
}

// if MetaServer has some changes, refresh the localCache_
Expand Down Expand Up @@ -227,7 +240,9 @@ bool MetaClient::loadUsersAndRoles() {
}

bool MetaClient::loadData() {
if (localDataLastUpdateTime_ == metadLastUpdateTime_) {
// UNKNOWN role will skip heartbeat
if (options_.role_ != cpp2::HostRole::UNKNOWN &&
localDataLastUpdateTime_ == metadLastUpdateTime_) {
return true;
}

Expand Down Expand Up @@ -2949,7 +2964,9 @@ StatusOr<std::vector<RemoteListenerInfo>> MetaClient::getListenerHostTypeBySpace
}

bool MetaClient::loadCfg() {
if (options_.skipConfig_ || localCfgLastUpdateTime_ == metadLastUpdateTime_) {
// UNKNOWN role will skip heartbeat
if (options_.skipConfig_ || (options_.role_ != cpp2::HostRole::UNKNOWN &&
localCfgLastUpdateTime_ == metadLastUpdateTime_)) {
return true;
}
if (!configReady_ && !registerCfg()) {
Expand Down Expand Up @@ -3611,5 +3628,29 @@ Status MetaClient::verifyVersion() {
return Status::OK();
}

Status MetaClient::saveVersionToMeta() {
auto req = cpp2::SaveGraphVersionReq();
req.build_version_ref() = getOriginVersion();
req.host_ref() = options_.localHost_;
folly::Promise<StatusOr<cpp2::SaveGraphVersionResp>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_saveGraphVersion(request); },
[](cpp2::SaveGraphVersionResp&& resp) { return std::move(resp); },
std::move(promise));

auto respStatus = std::move(future).get();
if (!respStatus.ok()) {
return respStatus.status();
}
auto resp = std::move(respStatus).value();
if (resp.get_code() != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("Failed to save graph version into meta, error code: %s",
apache::thrift::util::enumNameSafe(resp.get_code()).c_str());
}
return Status::OK();
}

} // namespace meta
} // namespace nebula
6 changes: 6 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,14 @@ class MetaClient : public BaseMetaClient {

ListenersMap doGetListenersMap(const HostAddr& host, const LocalCache& localCache);

// Checks if the the client version is compatible with the server version by checking the
// whilelist in meta.
Status verifyVersion();

// Save the version of the graph service into meta so that it could be looked up.
// This method should be only called in the internal client.
Status saveVersionToMeta();

private:
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::shared_ptr<thrift::ThriftClientManager<cpp2::MetaServiceAsyncClient>> clientsMan_;
Expand Down
1 change: 1 addition & 0 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ std::vector<std::string> NebulaKeyUtils::snapshotPrefix(PartitionID partId) {
if (partId == 0) {
result.emplace_back("");
} else {
result.emplace_back(vertexPrefix(partId));
result.emplace_back(tagPrefix(partId));
result.emplace_back(edgePrefix(partId));
result.emplace_back(IndexKeyUtils::indexPrefix(partId));
Expand Down
3 changes: 1 addition & 2 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,12 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
std::vector<folly::Future<DataSet>> futures;
for (auto& vid : meetVids) {
batchVids.push_back(vid);
if (i == totalSize - 1 || batchVids.size() == batchSize) {
if (++i == totalSize || batchVids.size() == batchSize) {
auto future = folly::via(runner(), [this, vids = std::move(batchVids), oddStep]() {
return doConjunct(vids, oddStep);
});
futures.emplace_back(std::move(future));
}
i++;
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
Expand Down
Loading