Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make scan interface return error code #117

Merged
merged 2 commits into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions examples/StorageClientExample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <nebula/sclient/ScanEdgeIter.h>
#include <common/Init.h>
#include <nebula/sclient/StorageClient.h>
#include "common/graph/Response.h"

int main(int argc, char* argv[]) {
nebula::init(&argc, &argv);
Expand All @@ -30,8 +31,9 @@ int main(int argc, char* argv[]) {
std::cout << "scan edge..." << std::endl;
while (scanEdgeIter.hasNext()) {
std::cout << "-------------------------" << std::endl;
nebula::DataSet ds = scanEdgeIter.next();
std::cout << ds << std::endl;
std::pair<nebula::ErrorCode, nebula::DataSet> res = scanEdgeIter.next();
std::cout << res.first << std::endl;
std::cout << res.second << std::endl;
std::cout << "+++++++++++++++++++++++++" << std::endl;
}

Expand All @@ -47,8 +49,9 @@ int main(int argc, char* argv[]) {
std::cout << "scan vertex..." << std::endl;
while (scanVertexIter.hasNext()) {
std::cout << "-------------------------" << std::endl;
nebula::DataSet ds = scanVertexIter.next();
std::cout << ds << std::endl;
std::pair<nebula::ErrorCode, nebula::DataSet> res = scanVertexIter.next();
std::cout << res.first << std::endl;
std::cout << res.second << std::endl;
std::cout << "+++++++++++++++++++++++++" << std::endl;
}

Expand Down
30 changes: 28 additions & 2 deletions include/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,22 @@ enum class ErrorCode { ErrorCodeEnums };

#undef X

const char *getErrorCode(ErrorCode code);
#define X(EnumName, EnumNumber) \
case ErrorCode::EnumName: \
return #EnumName;

static inline const char* getErrorCode(ErrorCode code) {
switch (code) { ErrorCodeEnums }
return "Unknown error";
}

static inline std::ostream &operator<<(std::ostream &os, ErrorCode code) {
os << getErrorCode(code);
return os;
}

#undef X

template <typename T>
bool inline checkPointer(const T *lhs, const T *rhs) {
if (lhs == rhs) {
Expand Down Expand Up @@ -419,7 +428,24 @@ struct PlanNodeDescription {
__clear();
}

bool operator==(const PlanNodeDescription &rhs) const;
bool operator==(const PlanNodeDescription &rhs) const {
if (name != rhs.name) {
return false;
}
if (id != rhs.id) {
return false;
}
if (!checkPointer(description.get(), rhs.description.get())) {
return false;
}
if (!checkPointer(profiles.get(), rhs.profiles.get())) {
return false;
}
if (!checkPointer(branchInfo.get(), rhs.branchInfo.get())) {
return false;
}
return checkPointer(dependencies.get(), rhs.dependencies.get());
}

std::string name;
int64_t id{-1};
Expand Down
3 changes: 2 additions & 1 deletion include/nebula/sclient/ScanEdgeIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <vector>

#include "common/datatypes/DataSet.h"
#include "common/graph/Response.h"

namespace nebula {
class StorageClient;
Expand All @@ -26,7 +27,7 @@ struct ScanEdgeIter {

bool hasNext();

DataSet next();
std::pair<::nebula::ErrorCode, DataSet> next();

StorageClient* client_;
storage::cpp2::ScanEdgeRequest* req_;
Expand Down
3 changes: 2 additions & 1 deletion include/nebula/sclient/ScanVertexIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <vector>

#include "common/datatypes/DataSet.h"
#include "common/graph/Response.h"

namespace nebula {
class StorageClient;
Expand All @@ -26,7 +27,7 @@ struct ScanVertexIter {

bool hasNext();

DataSet next();
std::pair<::nebula::ErrorCode, DataSet> next();

StorageClient* client_;
storage::cpp2::ScanVertexRequest* req_;
Expand Down
9 changes: 5 additions & 4 deletions include/nebula/sclient/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "nebula/sclient/SConfig.h"
#include "nebula/sclient/ScanEdgeIter.h"
#include "nebula/sclient/ScanVertexIter.h"
#include "common/graph/Response.h"

namespace folly {
class IOThreadPoolExecutor;
Expand Down Expand Up @@ -98,16 +99,16 @@ class StorageClient {
}

private:
std::pair<bool, storage::cpp2::ScanResponse> doScanEdge(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> doScanEdge(
const storage::cpp2::ScanEdgeRequest& req);

std::pair<bool, storage::cpp2::ScanResponse> doScanVertex(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> doScanVertex(
const storage::cpp2::ScanVertexRequest& req);

template <typename Request, typename RemoteFunc, typename Response>
void getResponse(std::pair<HostAddr, Request>&& request,
void getResponse(std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<std::pair<bool, Response>> pro);
folly::Promise<std::pair<::nebula::ErrorCode, Response>> pro);

std::unique_ptr<MetaClient> mClient_;
SConfig sConfig_;
Expand Down
1 change: 0 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ set(NEBULA_COMMON_SOURCES
datatypes/Value.cpp
datatypes/Vertex.cpp
datatypes/Duration.cpp
graph/Response.cpp
time/TimeConversion.cpp
geo/io/wkt/WKTWriter.cpp
geo/io/wkb/WKBWriter.cpp
Expand Down
40 changes: 0 additions & 40 deletions src/graph/Response.cpp

This file was deleted.

23 changes: 8 additions & 15 deletions src/sclient/ScanEdgeIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,22 @@ ScanEdgeIter::~ScanEdgeIter() {
delete req_;
}

DataSet ScanEdgeIter::next() {
if (!hasNext()) {
LOG(ERROR) << "hasNext() == false !";
return DataSet();
}
std::pair<::nebula::ErrorCode, DataSet> ScanEdgeIter::next() {
DCHECK(hasNext()) << "hasNext() == false !";
DCHECK(!!req_);
auto partCursorMapReq = req_->get_parts();
DCHECK_EQ(partCursorMapReq.size(), 1);
partCursorMapReq.begin()->second.set_next_cursor(nextCursor_);
req_->set_parts(partCursorMapReq);
auto r = client_->doScanEdge(*req_);
if (!r.first) {
LOG(ERROR) << "Scan edge failed";
auto code = r.first;
if (code != ::nebula::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Scan edge failed, error code: " << static_cast<int32_t>(code);
this->hasNext_ = false;
return DataSet();
return {code, DataSet()};
}
auto scanResponse = r.second;
if (!scanResponse.get_result().get_failed_parts().empty()) {
auto errorCode = scanResponse.get_result().get_failed_parts()[0].code;
LOG(ERROR) << "Scan edge failed, errorcode: " << static_cast<int32_t>(errorCode);
this->hasNext_ = false;
return DataSet();
}
DCHECK(scanResponse.get_result().get_failed_parts().empty());
auto partCursorMapResp = scanResponse.get_cursors();
DCHECK_EQ(partCursorMapResp.size(), 1);
auto scanCursor = partCursorMapResp.begin()->second;
Expand All @@ -52,7 +45,7 @@ DataSet ScanEdgeIter::next() {
nextCursor_ = scanCursor.next_cursor_ref().value();
}

return *scanResponse.get_props();
return {::nebula::ErrorCode::SUCCEEDED, *scanResponse.get_props()};
}

} // namespace nebula
23 changes: 8 additions & 15 deletions src/sclient/ScanVertexIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,22 @@ ScanVertexIter::~ScanVertexIter() {
delete req_;
}

DataSet ScanVertexIter::next() {
if (!hasNext()) {
LOG(ERROR) << "hasNext() == false !";
return DataSet();
}
std::pair<::nebula::ErrorCode, DataSet> ScanVertexIter::next() {
DCHECK(hasNext()) << "hasNext() == false !";
DCHECK(!!req_);
auto partCursorMapReq = req_->get_parts();
DCHECK_EQ(partCursorMapReq.size(), 1);
partCursorMapReq.begin()->second.set_next_cursor(nextCursor_);
req_->set_parts(partCursorMapReq);
auto r = client_->doScanVertex(*req_);
if (!r.first) {
LOG(ERROR) << "Scan vertex failed";
auto code = r.first;
if (code != ::nebula::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Scan vertex failed, error code: " << static_cast<int32_t>(code);
this->hasNext_ = false;
return DataSet();
return {code, DataSet()};
}
auto scanResponse = r.second;
if (!scanResponse.get_result().get_failed_parts().empty()) {
auto errorCode = scanResponse.get_result().get_failed_parts()[0].code;
LOG(ERROR) << "Scan vertex failed, errorcode: " << static_cast<int32_t>(errorCode);
this->hasNext_ = false;
return DataSet();
}
DCHECK(scanResponse.get_result().get_failed_parts().empty());
auto partCursorMapResp = scanResponse.get_cursors();
DCHECK_EQ(partCursorMapResp.size(), 1);
auto scanCursor = partCursorMapResp.begin()->second;
Expand All @@ -54,7 +47,7 @@ DataSet ScanVertexIter::next() {
nextCursor_ = scanCursor.next_cursor_ref().value();
}

return *scanResponse.get_props();
return {::nebula::ErrorCode::SUCCEEDED, *scanResponse.get_props()};
}

} // namespace nebula
50 changes: 35 additions & 15 deletions src/sclient/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,20 @@ ScanEdgeIter StorageClient::scanEdgeWithPart(std::string spaceName,
return {this, req};
}

std::pair<bool, storage::cpp2::ScanResponse> StorageClient::doScanEdge(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> StorageClient::doScanEdge(
const storage::cpp2::ScanEdgeRequest& req) {
std::pair<HostAddr, storage::cpp2::ScanEdgeRequest> request;
auto partCursorMap = req.get_parts();
DCHECK_EQ(partCursorMap.size(), 1);
PartitionID partId = partCursorMap.begin()->first;
auto host = mClient_->getPartLeaderFromCache(req.get_space_id(), partId);
if (!host.first) {
return {false, storage::cpp2::ScanResponse()};
return {::nebula::ErrorCode::E_UNKNOWN, storage::cpp2::ScanResponse()};
}
request.first = host.second;
request.second = req;

folly::Promise<std::pair<bool, storage::cpp2::ScanResponse>> promise;
folly::Promise<std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse>> promise;
auto future = promise.getFuture();
getResponse(
std::move(request),
Expand Down Expand Up @@ -159,20 +159,20 @@ ScanVertexIter StorageClient::scanVertexWithPart(
return {this, req};
}

std::pair<bool, storage::cpp2::ScanResponse> StorageClient::doScanVertex(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> StorageClient::doScanVertex(
const storage::cpp2::ScanVertexRequest& req) {
std::pair<HostAddr, storage::cpp2::ScanVertexRequest> request;
auto partCursorMap = req.get_parts();
DCHECK_EQ(partCursorMap.size(), 1);
PartitionID partId = partCursorMap.begin()->first;
auto host = mClient_->getPartLeaderFromCache(req.get_space_id(), partId);
if (!host.first) {
return {false, storage::cpp2::ScanResponse()};
return {::nebula::ErrorCode::E_UNKNOWN, storage::cpp2::ScanResponse()};
}
request.first = host.second;
request.second = req;

folly::Promise<std::pair<bool, storage::cpp2::ScanResponse>> promise;
folly::Promise<std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse>> promise;
auto future = promise.getFuture();
getResponse(
std::move(request),
Expand All @@ -185,7 +185,7 @@ std::pair<bool, storage::cpp2::ScanResponse> StorageClient::doScanVertex(
template <typename Request, typename RemoteFunc, typename Response>
void StorageClient::getResponse(std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<std::pair<bool, Response>> pro) {
folly::Promise<std::pair<::nebula::ErrorCode, Response>> pro) {
auto* evb = DCHECK_NOTNULL(ioExecutor_)->getEventBase();
folly::via(evb,
[evb,
Expand All @@ -198,16 +198,36 @@ void StorageClient::getResponse(std::pair<HostAddr, Request>&& request,
LOG(INFO) << "Send request to storage " << host;
remoteFunc(client.get(), request.second)
.via(evb)
.then([pro = std::move(pro), host](folly::Try<Response>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
LOG(ERROR) << "Send request to " << host << " failed";
LOG(ERROR) << "RpcResponse exception: " << t.exception().what().c_str();
pro.setValue(std::make_pair(false, Response()));
.thenValue([pro = std::move(pro), this](Response&& resp) mutable {
auto& result = resp.get_result();
for (auto& part : result.get_failed_parts()) {
auto partId = part.get_part_id();
auto code = part.get_code();

LOG(ERROR) << "Failure! Failed part: " << partId
<< ", error code: " << static_cast<int32_t>(code);
pro.setValue(std::make_pair(::nebula::ErrorCode(static_cast<int32_t>(code)),
Response()));
return;
}
auto&& resp = t.value();
pro.setValue(std::make_pair(true, std::move(resp)));
pro.setValue(std::make_pair(::nebula::ErrorCode::SUCCEEDED, std::move(resp)));
return;
})
.thenError([pro = std::move(pro), host, this](
folly::exception_wrapper&& exWrapper) mutable {
using TransportException = apache::thrift::transport::TTransportException;
auto ex = exWrapper.get_exception<TransportException>();
if (ex) {
if (ex->getType() == TransportException::TIMED_OUT) {
LOG(ERROR) << "Request to " << host << " time out: " << ex->what();
} else {
LOG(ERROR) << "Request to " << host << " failed: " << ex->what();
}
} else {
LOG(ERROR) << "Request to " << host << " failed.";
}
pro.setValue(std::make_pair(::nebula::ErrorCode::E_RPC_FAILURE, Response()));
return;
});
}); // via
}
Expand Down
Loading