Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scan vertices to storage client. #113

Merged
merged 2 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/nebula/mclient/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class ListSpacesReq;
class ListSpacesResp;
class IdName;
class EdgeItem;
class TagItem;
class ListEdgesReq;
class ListEdgesResp;

Expand All @@ -61,6 +62,9 @@ using SpaceNameIdMap = std::unordered_map<std::string, GraphSpaceID>;
using SpaceEdgeNameTypeMap =
std::unordered_map<std::pair<GraphSpaceID, std::string>, EdgeType, pair_hash>;

using SpaceTagNameTypeMap =
std::unordered_map<std::pair<GraphSpaceID, std::string>, TagID, pair_hash>;

class MetaClient {
public:
explicit MetaClient(const std::vector<std::string> &metaAddrs,
Expand All @@ -73,6 +77,8 @@ class MetaClient {
std::pair<bool, EdgeType> getEdgeTypeByNameFromCache(GraphSpaceID spaceId,
const std::string &name);

std::pair<bool, TagID> getTagIdByNameFromCache(GraphSpaceID spaceId, const std::string &name);

std::pair<bool, std::vector<PartitionID>> getPartsFromCache(GraphSpaceID spaceId);

std::pair<bool, HostAddr> getPartLeaderFromCache(GraphSpaceID spaceId, PartitionID partId);
Expand All @@ -86,6 +92,8 @@ class MetaClient {

std::pair<bool, std::vector<meta::cpp2::EdgeItem>> listEdgeSchemas(GraphSpaceID spaceId);

std::pair<bool, std::vector<meta::cpp2::TagItem>> listTagSchemas(GraphSpaceID spaceId);

void loadLeader(const std::vector<nebula::meta::cpp2::HostItem> &hostItems,
const SpaceNameIdMap &spaceIndexByName);

Expand All @@ -107,6 +115,7 @@ class MetaClient {
MConfig mConfig_;
SpaceNameIdMap spaceIndexByName_;
SpaceEdgeNameTypeMap spaceEdgeIndexByName_;
SpaceTagNameTypeMap spaceTagIndexByName_;
std::unordered_map<std::pair<GraphSpaceID, PartitionID>, HostAddr, pair_hash> spacePartLeaderMap_;
std::unordered_map<GraphSpaceID, std::vector<PartitionID>> spacePartsMap_;
std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor_;
Expand Down
37 changes: 37 additions & 0 deletions include/nebula/sclient/ScanVertexIter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/* Copyright (c) 2023 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#pragma once

#include <string>
#include <vector>

#include "common/datatypes/DataSet.h"

namespace nebula {
class StorageClient;

namespace storage {
namespace cpp2 {
class ScanVertexRequest;
} // namespace cpp2
} // namespace storage

struct ScanVertexIter {
ScanVertexIter(StorageClient* client, storage::cpp2::ScanVertexRequest* req, bool hasNext = true);

~ScanVertexIter();

bool hasNext();

DataSet next();

StorageClient* client_;
storage::cpp2::ScanVertexRequest* req_;
bool hasNext_;
std::string nextCursor_;
};

} // namespace nebula
17 changes: 17 additions & 0 deletions include/nebula/sclient/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "nebula/mclient/MetaClient.h"
#include "nebula/sclient/SConfig.h"
#include "nebula/sclient/ScanEdgeIter.h"
#include "nebula/sclient/ScanVertexIter.h"

namespace folly {
class IOThreadPoolExecutor;
Expand Down Expand Up @@ -58,6 +59,7 @@ class ScanResponse;

class StorageClient {
friend struct ScanEdgeIter;
friend struct ScanVertexIter;

public:
explicit StorageClient(const std::vector<std::string>& metaAddrs,
Expand All @@ -79,6 +81,18 @@ class StorageClient {
bool onlyLatestVersion = false,
bool enableReadFromFollower = true); // plato needed

ScanVertexIter scanVertexWithPart(
std::string spaceName,
int32_t partID,
// tag name -> prop names
std::unordered_map<std::string, std::vector<std::string>> tagProps,
int64_t limit = DEFAULT_LIMIT,
int64_t startTime = DEFAULT_START_TIME,
int64_t endTime = DEFAULT_END_TIME,
std::string filter = "",
bool onlyLatestVersion = false,
bool enableReadFromFollower = true); // plato needed

MetaClient* getMetaClient() {
return mClient_.get();
}
Expand All @@ -87,6 +101,9 @@ class StorageClient {
std::pair<bool, storage::cpp2::ScanResponse> doScanEdge(
const storage::cpp2::ScanEdgeRequest& req);

std::pair<bool, storage::cpp2::ScanResponse> doScanVertex(
const storage::cpp2::ScanVertexRequest& req);

template <typename Request, typename RemoteFunc, typename Response>
void getResponse(std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ set(NEBULA_SCLIENT_SOURCES
${NEBULA_MCLIENT_SOURCES}
sclient/StorageClient.cpp
sclient/ScanEdgeIter.cpp
sclient/ScanVertexIter.cpp
)

set(NEBULA_THIRD_PARTY_LIBRARIES
Expand Down
36 changes: 36 additions & 0 deletions src/mclient/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ std::pair<bool, EdgeType> MetaClient::getEdgeTypeByNameFromCache(GraphSpaceID sp
return {true, it->second};
}

std::pair<bool, TagID> MetaClient::getTagIdByNameFromCache(GraphSpaceID spaceId,
const std::string& name) {
auto it = spaceTagIndexByName_.find(std::make_pair(spaceId, name));
if (it == spaceTagIndexByName_.end()) {
LOG(ERROR) << "getTagIdByNameFromCache(" << spaceId << ", " << name << ") failed";
return {false, -1};
}
return {true, it->second};
}

std::pair<bool, std::vector<PartitionID>> MetaClient::getPartsFromCache(GraphSpaceID spaceId) {
auto iter = spacePartsMap_.find(spaceId);
if (iter == spacePartsMap_.end()) {
Expand Down Expand Up @@ -94,6 +104,17 @@ bool MetaClient::loadData() {
for (auto& edgeItem : edgeItems) {
spaceEdgeIndexByName_[{spaceId, edgeItem.get_edge_name()}] = edgeItem.get_edge_type();
}

// tags
auto tagRet = listTagSchemas(spaceId);
if (!tagRet.first) {
LOG(ERROR) << "Get tag schemas failed for spaceId " << spaceId;
return false;
}
auto& tagItems = tagRet.second;
for (auto& tagItem : tagItems) {
spaceTagIndexByName_[{spaceId, tagItem.get_tag_name()}] = tagItem.get_tag_id();
}
}
auto hostsRet = listHosts(meta::cpp2::ListHostType::ALLOC);
if (!hostsRet.first) {
Expand Down Expand Up @@ -153,6 +174,21 @@ std::pair<bool, std::vector<meta::cpp2::EdgeItem>> MetaClient::listEdgeSchemas(
return std::move(future).get();
}

std::pair<bool, std::vector<meta::cpp2::TagItem>> MetaClient::listTagSchemas(GraphSpaceID spaceId) {
meta::cpp2::ListTagsReq req;
req.space_id_ref() = spaceId;
folly::Promise<std::pair<bool, std::vector<meta::cpp2::TagItem>>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_listTags(request); },
[](meta::cpp2::ListTagsResp&& resp) -> decltype(auto) {
return std::make_pair(true, std::move(resp).get_tags());
},
std::move(promise));
return std::move(future).get();
}

void MetaClient::loadLeader(const std::vector<meta::cpp2::HostItem>& hostItems,
const SpaceNameIdMap& spaceIndexByName) {
for (auto& item : hostItems) {
Expand Down
60 changes: 60 additions & 0 deletions src/sclient/ScanVertexIter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/* Copyright (c) 2023 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "nebula/sclient/ScanVertexIter.h"

#include "../interface/gen-cpp2/storage_types.h"
#include "nebula/sclient/StorageClient.h"

namespace nebula {

ScanVertexIter::ScanVertexIter(StorageClient* client,
storage::cpp2::ScanVertexRequest* req,
bool hasNext)
: client_(client), req_(req), hasNext_(hasNext) {}

bool ScanVertexIter::hasNext() {
return hasNext_;
}

ScanVertexIter::~ScanVertexIter() {
delete req_;
}

DataSet ScanVertexIter::next() {
if (!hasNext()) {
LOG(ERROR) << "hasNext() == false !";
return DataSet();
}
DCHECK(!!req_);
auto partCursorMapReq = req_->get_parts();
DCHECK_EQ(partCursorMapReq.size(), 1);
partCursorMapReq.begin()->second.set_next_cursor(nextCursor_);
req_->set_parts(partCursorMapReq);
auto r = client_->doScanVertex(*req_);
if (!r.first) {
LOG(ERROR) << "Scan vertex failed";
this->hasNext_ = false;
return DataSet();
}
auto scanResponse = r.second;
if (!scanResponse.get_result().get_failed_parts().empty()) {
auto errorCode = scanResponse.get_result().get_failed_parts()[0].code;
LOG(ERROR) << "Scan vertex failed, errorcode: " << static_cast<int32_t>(errorCode);
this->hasNext_ = false;
return DataSet();
}
auto partCursorMapResp = scanResponse.get_cursors();
DCHECK_EQ(partCursorMapResp.size(), 1);
auto scanCursor = partCursorMapResp.begin()->second;
hasNext_ = scanCursor.next_cursor_ref().has_value();
if (hasNext_) {
nextCursor_ = scanCursor.next_cursor_ref().value();
}

return *scanResponse.get_props();
}

} // namespace nebula
74 changes: 74 additions & 0 deletions src/sclient/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,80 @@ std::pair<bool, storage::cpp2::ScanResponse> StorageClient::doScanEdge(
return std::move(future).get();
}

ScanVertexIter StorageClient::scanVertexWithPart(
std::string spaceName,
int32_t partId,
// tag name -> prop names
std::unordered_map<std::string, std::vector<std::string>> tagProps,
int64_t limit,
int64_t startTime,
int64_t endTime,
std::string filter,
bool onlyLatestVersion,
bool enableReadFromFollower) {
auto spaceIdResult = mClient_->getSpaceIdByNameFromCache(spaceName);
if (!spaceIdResult.first) {
return {nullptr, nullptr, false};
}
int32_t spaceId = spaceIdResult.second;

std::vector<storage::cpp2::VertexProp> returnCols;

for (const auto& tagProp : tagProps) {
auto tagTypeResult = mClient_->getTagIdByNameFromCache(spaceId, tagProp.first);
if (!tagTypeResult.first) {
return {nullptr, nullptr, false};
}
int32_t tagType = tagTypeResult.second;

storage::cpp2::VertexProp returnCol;
returnCol.set_tag(tagType);
returnCol.set_props(tagProp.second);
returnCols.emplace_back(returnCol);
}

auto* req = new storage::cpp2::ScanVertexRequest;
req->set_space_id(spaceId);
// old interface
// req->set_part_id(partId);
// req->set_cursor("");
// new interface
storage::cpp2::ScanCursor scanCursor;
req->set_parts(std::unordered_map<PartitionID, storage::cpp2::ScanCursor>{{partId, scanCursor}});
req->set_return_columns(std::move(returnCols));
req->set_limit(limit);
req->set_start_time(startTime);
req->set_end_time(endTime);
req->set_filter(filter);
req->set_only_latest_version(onlyLatestVersion);
req->set_enable_read_from_follower(enableReadFromFollower);

return {this, req};
}

std::pair<bool, storage::cpp2::ScanResponse> StorageClient::doScanVertex(
const storage::cpp2::ScanVertexRequest& req) {
std::pair<HostAddr, storage::cpp2::ScanVertexRequest> request;
auto partCursorMap = req.get_parts();
DCHECK_EQ(partCursorMap.size(), 1);
PartitionID partId = partCursorMap.begin()->first;
auto host = mClient_->getPartLeaderFromCache(req.get_space_id(), partId);
if (!host.first) {
return {false, storage::cpp2::ScanResponse()};
}
request.first = host.second;
request.second = req;

folly::Promise<std::pair<bool, storage::cpp2::ScanResponse>> promise;
auto future = promise.getFuture();
getResponse(
std::move(request),
[](storage::cpp2::GraphStorageServiceAsyncClient* client,
const storage::cpp2::ScanVertexRequest& r) { return client->future_scanVertex(r); },
std::move(promise));
return std::move(future).get();
}

template <typename Request, typename RemoteFunc, typename Response>
void StorageClient::getResponse(std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
Expand Down
Loading