Skip to content

Commit

Permalink
Cherry pick v3.1.0 (0414-0420) (#4183)
Browse files Browse the repository at this point in the history
* fix issue 4152 (#4158)

* Fix optional of multi-match (#4159)

* fix optional of multi-match

* format

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* Fix incompatibility imported by #4116 (#4165)

* Add SaveGraphVersionProcessor to separate client version check and version saving

* Update error code

* Update error code

* optimizer path (#4162)

* optimizer multi-shortest path

* new algorithm

* fix error

* skip heartbeat for tool (#4177)

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* Fix/null pattern expression input (#4180)

* Move input rows of Traverse and AppendVertices.

* Avoid skip validate pattern expression with aggregate.

* Fix case.

* Revert "Move input rows of Traverse and AppendVertices."

This reverts commit 7fd1d38.

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* fix wrong space key after dropping hosts (#4182)

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* fix vertex is missing from snapshot (#4189)

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* Expression is stateful to store the result of evaluation, so we can't… (#4190)

* Expression is stateful to store the result of evaluation, so we can't share it inter threads.

* Fix defef nullptr.

Co-authored-by: jie.wang <38901892+jievince@users.noreply.github.com>
Co-authored-by: kyle.cao <kyle.cao@vesoft.com>
Co-authored-by: Yichen Wang <18348405+Aiee@users.noreply.github.com>
Co-authored-by: jimingquan <mingquan.ji@vesoft.com>
Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
Co-authored-by: shylock <33566796+Shylock-Hg@users.noreply.github.com>
Co-authored-by: liwenhui-soul <38217397+liwenhui-soul@users.noreply.github.com>
  • Loading branch information
8 people authored Apr 20, 2022
1 parent 85b77a5 commit f4a2299
Show file tree
Hide file tree
Showing 26 changed files with 428 additions and 146 deletions.
53 changes: 47 additions & 6 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,21 @@ bool MetaClient::waitForMetadReady(int count, int retryIntervalSecs) {
LOG(ERROR) << "Connect to the MetaServer Failed";
return false;
}

// Verify the graph server version
auto status = verifyVersion();
if (!status.ok()) {
LOG(ERROR) << status;
return false;
}

// Save graph version to meta
status = saveVersionToMeta();
if (!status.ok()) {
LOG(ERROR) << status;
return false;
}

CHECK(bgThread_->start());
LOG(INFO) << "Register time task for heartbeat!";
size_t delayMS = FLAGS_heartbeat_interval_secs * 1000 + folly::Random::rand32(900);
Expand All @@ -161,10 +170,14 @@ void MetaClient::heartBeatThreadFunc() {
bgThread_->addDelayTask(
FLAGS_heartbeat_interval_secs * 1000, &MetaClient::heartBeatThreadFunc, this);
};
auto ret = heartbeat().get();
if (!ret.ok()) {
LOG(ERROR) << "Heartbeat failed, status:" << ret.status();
return;
// UNKNOWN is reserved for tools such as upgrader, in that case the ip/port is not set. We do
// not send heartbeat to meta to avoid writing error host info (e.g. Host("", 0))
if (options_.role_ != cpp2::HostRole::UNKNOWN) {
auto ret = heartbeat().get();
if (!ret.ok()) {
LOG(ERROR) << "Heartbeat failed, status:" << ret.status();
return;
}
}

// if MetaServer has some changes, refresh the localCache_
Expand Down Expand Up @@ -227,7 +240,9 @@ bool MetaClient::loadUsersAndRoles() {
}

bool MetaClient::loadData() {
if (localDataLastUpdateTime_ == metadLastUpdateTime_) {
// UNKNOWN role will skip heartbeat
if (options_.role_ != cpp2::HostRole::UNKNOWN &&
localDataLastUpdateTime_ == metadLastUpdateTime_) {
return true;
}

Expand Down Expand Up @@ -2949,7 +2964,9 @@ StatusOr<std::vector<RemoteListenerInfo>> MetaClient::getListenerHostTypeBySpace
}

bool MetaClient::loadCfg() {
if (options_.skipConfig_ || localCfgLastUpdateTime_ == metadLastUpdateTime_) {
// UNKNOWN role will skip heartbeat
if (options_.skipConfig_ || (options_.role_ != cpp2::HostRole::UNKNOWN &&
localCfgLastUpdateTime_ == metadLastUpdateTime_)) {
return true;
}
if (!configReady_ && !registerCfg()) {
Expand Down Expand Up @@ -3611,5 +3628,29 @@ Status MetaClient::verifyVersion() {
return Status::OK();
}

Status MetaClient::saveVersionToMeta() {
auto req = cpp2::SaveGraphVersionReq();
req.build_version_ref() = getOriginVersion();
req.host_ref() = options_.localHost_;
folly::Promise<StatusOr<cpp2::SaveGraphVersionResp>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_saveGraphVersion(request); },
[](cpp2::SaveGraphVersionResp&& resp) { return std::move(resp); },
std::move(promise));

auto respStatus = std::move(future).get();
if (!respStatus.ok()) {
return respStatus.status();
}
auto resp = std::move(respStatus).value();
if (resp.get_code() != nebula::cpp2::ErrorCode::SUCCEEDED) {
return Status::Error("Failed to save graph version into meta, error code: %s",
apache::thrift::util::enumNameSafe(resp.get_code()).c_str());
}
return Status::OK();
}

} // namespace meta
} // namespace nebula
6 changes: 6 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -733,8 +733,14 @@ class MetaClient : public BaseMetaClient {

ListenersMap doGetListenersMap(const HostAddr& host, const LocalCache& localCache);

// Checks if the the client version is compatible with the server version by checking the
// whilelist in meta.
Status verifyVersion();

// Save the version of the graph service into meta so that it could be looked up.
// This method should be only called in the internal client.
Status saveVersionToMeta();

private:
std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool_;
std::shared_ptr<thrift::ThriftClientManager<cpp2::MetaServiceAsyncClient>> clientsMan_;
Expand Down
1 change: 1 addition & 0 deletions src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ std::vector<std::string> NebulaKeyUtils::snapshotPrefix(PartitionID partId) {
if (partId == 0) {
result.emplace_back("");
} else {
result.emplace_back(vertexPrefix(partId));
result.emplace_back(tagPrefix(partId));
result.emplace_back(edgePrefix(partId));
result.emplace_back(IndexKeyUtils::indexPrefix(partId));
Expand Down
3 changes: 1 addition & 2 deletions src/graph/executor/algo/BFSShortestPathExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,12 @@ folly::Future<Status> BFSShortestPathExecutor::conjunctPath() {
std::vector<folly::Future<DataSet>> futures;
for (auto& vid : meetVids) {
batchVids.push_back(vid);
if (i == totalSize - 1 || batchVids.size() == batchSize) {
if (++i == totalSize || batchVids.size() == batchSize) {
auto future = folly::via(runner(), [this, vids = std::move(batchVids), oddStep]() {
return doConjunct(vids, oddStep);
});
futures.emplace_back(std::move(future));
}
i++;
}

return folly::collect(futures).via(runner()).thenValue([this](auto&& resps) {
Expand Down
Loading

0 comments on commit f4a2299

Please sign in to comment.