Skip to content

Commit

Permalink
make scan interface return error code (#117)
Browse files Browse the repository at this point in the history
* make scan interface return error code

* remove Response.cpp
  • Loading branch information
jievince authored Apr 10, 2023
1 parent 6189a41 commit 4fd300b
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 114 deletions.
11 changes: 7 additions & 4 deletions examples/StorageClientExample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <nebula/sclient/ScanEdgeIter.h>
#include <common/Init.h>
#include <nebula/sclient/StorageClient.h>
#include "common/graph/Response.h"

int main(int argc, char* argv[]) {
nebula::init(&argc, &argv);
Expand All @@ -30,8 +31,9 @@ int main(int argc, char* argv[]) {
std::cout << "scan edge..." << std::endl;
while (scanEdgeIter.hasNext()) {
std::cout << "-------------------------" << std::endl;
nebula::DataSet ds = scanEdgeIter.next();
std::cout << ds << std::endl;
std::pair<nebula::ErrorCode, nebula::DataSet> res = scanEdgeIter.next();
std::cout << res.first << std::endl;
std::cout << res.second << std::endl;
std::cout << "+++++++++++++++++++++++++" << std::endl;
}

Expand All @@ -47,8 +49,9 @@ int main(int argc, char* argv[]) {
std::cout << "scan vertex..." << std::endl;
while (scanVertexIter.hasNext()) {
std::cout << "-------------------------" << std::endl;
nebula::DataSet ds = scanVertexIter.next();
std::cout << ds << std::endl;
std::pair<nebula::ErrorCode, nebula::DataSet> res = scanVertexIter.next();
std::cout << res.first << std::endl;
std::cout << res.second << std::endl;
std::cout << "+++++++++++++++++++++++++" << std::endl;
}

Expand Down
30 changes: 28 additions & 2 deletions include/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,13 +276,22 @@ enum class ErrorCode { ErrorCodeEnums };

#undef X

const char *getErrorCode(ErrorCode code);
#define X(EnumName, EnumNumber) \
case ErrorCode::EnumName: \
return #EnumName;

static inline const char* getErrorCode(ErrorCode code) {
switch (code) { ErrorCodeEnums }
return "Unknown error";
}

static inline std::ostream &operator<<(std::ostream &os, ErrorCode code) {
os << getErrorCode(code);
return os;
}

#undef X

template <typename T>
bool inline checkPointer(const T *lhs, const T *rhs) {
if (lhs == rhs) {
Expand Down Expand Up @@ -419,7 +428,24 @@ struct PlanNodeDescription {
__clear();
}

bool operator==(const PlanNodeDescription &rhs) const;
bool operator==(const PlanNodeDescription &rhs) const {
if (name != rhs.name) {
return false;
}
if (id != rhs.id) {
return false;
}
if (!checkPointer(description.get(), rhs.description.get())) {
return false;
}
if (!checkPointer(profiles.get(), rhs.profiles.get())) {
return false;
}
if (!checkPointer(branchInfo.get(), rhs.branchInfo.get())) {
return false;
}
return checkPointer(dependencies.get(), rhs.dependencies.get());
}

std::string name;
int64_t id{-1};
Expand Down
3 changes: 2 additions & 1 deletion include/nebula/sclient/ScanEdgeIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <vector>

#include "common/datatypes/DataSet.h"
#include "common/graph/Response.h"

namespace nebula {
class StorageClient;
Expand All @@ -26,7 +27,7 @@ struct ScanEdgeIter {

bool hasNext();

DataSet next();
std::pair<::nebula::ErrorCode, DataSet> next();

StorageClient* client_;
storage::cpp2::ScanEdgeRequest* req_;
Expand Down
3 changes: 2 additions & 1 deletion include/nebula/sclient/ScanVertexIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <vector>

#include "common/datatypes/DataSet.h"
#include "common/graph/Response.h"

namespace nebula {
class StorageClient;
Expand All @@ -26,7 +27,7 @@ struct ScanVertexIter {

bool hasNext();

DataSet next();
std::pair<::nebula::ErrorCode, DataSet> next();

StorageClient* client_;
storage::cpp2::ScanVertexRequest* req_;
Expand Down
9 changes: 5 additions & 4 deletions include/nebula/sclient/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "nebula/sclient/SConfig.h"
#include "nebula/sclient/ScanEdgeIter.h"
#include "nebula/sclient/ScanVertexIter.h"
#include "common/graph/Response.h"

namespace folly {
class IOThreadPoolExecutor;
Expand Down Expand Up @@ -98,16 +99,16 @@ class StorageClient {
}

private:
std::pair<bool, storage::cpp2::ScanResponse> doScanEdge(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> doScanEdge(
const storage::cpp2::ScanEdgeRequest& req);

std::pair<bool, storage::cpp2::ScanResponse> doScanVertex(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> doScanVertex(
const storage::cpp2::ScanVertexRequest& req);

template <typename Request, typename RemoteFunc, typename Response>
void getResponse(std::pair<HostAddr, Request>&& request,
void getResponse(std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<std::pair<bool, Response>> pro);
folly::Promise<std::pair<::nebula::ErrorCode, Response>> pro);

std::unique_ptr<MetaClient> mClient_;
SConfig sConfig_;
Expand Down
1 change: 0 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ set(NEBULA_COMMON_SOURCES
datatypes/Value.cpp
datatypes/Vertex.cpp
datatypes/Duration.cpp
graph/Response.cpp
time/TimeConversion.cpp
geo/io/wkt/WKTWriter.cpp
geo/io/wkb/WKBWriter.cpp
Expand Down
40 changes: 0 additions & 40 deletions src/graph/Response.cpp

This file was deleted.

23 changes: 8 additions & 15 deletions src/sclient/ScanEdgeIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,22 @@ ScanEdgeIter::~ScanEdgeIter() {
delete req_;
}

DataSet ScanEdgeIter::next() {
if (!hasNext()) {
LOG(ERROR) << "hasNext() == false !";
return DataSet();
}
std::pair<::nebula::ErrorCode, DataSet> ScanEdgeIter::next() {
DCHECK(hasNext()) << "hasNext() == false !";
DCHECK(!!req_);
auto partCursorMapReq = req_->get_parts();
DCHECK_EQ(partCursorMapReq.size(), 1);
partCursorMapReq.begin()->second.set_next_cursor(nextCursor_);
req_->set_parts(partCursorMapReq);
auto r = client_->doScanEdge(*req_);
if (!r.first) {
LOG(ERROR) << "Scan edge failed";
auto code = r.first;
if (code != ::nebula::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Scan edge failed, error code: " << static_cast<int32_t>(code);
this->hasNext_ = false;
return DataSet();
return {code, DataSet()};
}
auto scanResponse = r.second;
if (!scanResponse.get_result().get_failed_parts().empty()) {
auto errorCode = scanResponse.get_result().get_failed_parts()[0].code;
LOG(ERROR) << "Scan edge failed, errorcode: " << static_cast<int32_t>(errorCode);
this->hasNext_ = false;
return DataSet();
}
DCHECK(scanResponse.get_result().get_failed_parts().empty());
auto partCursorMapResp = scanResponse.get_cursors();
DCHECK_EQ(partCursorMapResp.size(), 1);
auto scanCursor = partCursorMapResp.begin()->second;
Expand All @@ -52,7 +45,7 @@ DataSet ScanEdgeIter::next() {
nextCursor_ = scanCursor.next_cursor_ref().value();
}

return *scanResponse.get_props();
return {::nebula::ErrorCode::SUCCEEDED, *scanResponse.get_props()};
}

} // namespace nebula
23 changes: 8 additions & 15 deletions src/sclient/ScanVertexIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,22 @@ ScanVertexIter::~ScanVertexIter() {
delete req_;
}

DataSet ScanVertexIter::next() {
if (!hasNext()) {
LOG(ERROR) << "hasNext() == false !";
return DataSet();
}
std::pair<::nebula::ErrorCode, DataSet> ScanVertexIter::next() {
DCHECK(hasNext()) << "hasNext() == false !";
DCHECK(!!req_);
auto partCursorMapReq = req_->get_parts();
DCHECK_EQ(partCursorMapReq.size(), 1);
partCursorMapReq.begin()->second.set_next_cursor(nextCursor_);
req_->set_parts(partCursorMapReq);
auto r = client_->doScanVertex(*req_);
if (!r.first) {
LOG(ERROR) << "Scan vertex failed";
auto code = r.first;
if (code != ::nebula::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Scan vertex failed, error code: " << static_cast<int32_t>(code);
this->hasNext_ = false;
return DataSet();
return {code, DataSet()};
}
auto scanResponse = r.second;
if (!scanResponse.get_result().get_failed_parts().empty()) {
auto errorCode = scanResponse.get_result().get_failed_parts()[0].code;
LOG(ERROR) << "Scan vertex failed, errorcode: " << static_cast<int32_t>(errorCode);
this->hasNext_ = false;
return DataSet();
}
DCHECK(scanResponse.get_result().get_failed_parts().empty());
auto partCursorMapResp = scanResponse.get_cursors();
DCHECK_EQ(partCursorMapResp.size(), 1);
auto scanCursor = partCursorMapResp.begin()->second;
Expand All @@ -54,7 +47,7 @@ DataSet ScanVertexIter::next() {
nextCursor_ = scanCursor.next_cursor_ref().value();
}

return *scanResponse.get_props();
return {::nebula::ErrorCode::SUCCEEDED, *scanResponse.get_props()};
}

} // namespace nebula
50 changes: 35 additions & 15 deletions src/sclient/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,20 @@ ScanEdgeIter StorageClient::scanEdgeWithPart(std::string spaceName,
return {this, req};
}

std::pair<bool, storage::cpp2::ScanResponse> StorageClient::doScanEdge(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> StorageClient::doScanEdge(
const storage::cpp2::ScanEdgeRequest& req) {
std::pair<HostAddr, storage::cpp2::ScanEdgeRequest> request;
auto partCursorMap = req.get_parts();
DCHECK_EQ(partCursorMap.size(), 1);
PartitionID partId = partCursorMap.begin()->first;
auto host = mClient_->getPartLeaderFromCache(req.get_space_id(), partId);
if (!host.first) {
return {false, storage::cpp2::ScanResponse()};
return {::nebula::ErrorCode::E_UNKNOWN, storage::cpp2::ScanResponse()};
}
request.first = host.second;
request.second = req;

folly::Promise<std::pair<bool, storage::cpp2::ScanResponse>> promise;
folly::Promise<std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse>> promise;
auto future = promise.getFuture();
getResponse(
std::move(request),
Expand Down Expand Up @@ -159,20 +159,20 @@ ScanVertexIter StorageClient::scanVertexWithPart(
return {this, req};
}

std::pair<bool, storage::cpp2::ScanResponse> StorageClient::doScanVertex(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> StorageClient::doScanVertex(
const storage::cpp2::ScanVertexRequest& req) {
std::pair<HostAddr, storage::cpp2::ScanVertexRequest> request;
auto partCursorMap = req.get_parts();
DCHECK_EQ(partCursorMap.size(), 1);
PartitionID partId = partCursorMap.begin()->first;
auto host = mClient_->getPartLeaderFromCache(req.get_space_id(), partId);
if (!host.first) {
return {false, storage::cpp2::ScanResponse()};
return {::nebula::ErrorCode::E_UNKNOWN, storage::cpp2::ScanResponse()};
}
request.first = host.second;
request.second = req;

folly::Promise<std::pair<bool, storage::cpp2::ScanResponse>> promise;
folly::Promise<std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse>> promise;
auto future = promise.getFuture();
getResponse(
std::move(request),
Expand All @@ -185,7 +185,7 @@ std::pair<bool, storage::cpp2::ScanResponse> StorageClient::doScanVertex(
template <typename Request, typename RemoteFunc, typename Response>
void StorageClient::getResponse(std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<std::pair<bool, Response>> pro) {
folly::Promise<std::pair<::nebula::ErrorCode, Response>> pro) {
auto* evb = DCHECK_NOTNULL(ioExecutor_)->getEventBase();
folly::via(evb,
[evb,
Expand All @@ -198,16 +198,36 @@ void StorageClient::getResponse(std::pair<HostAddr, Request>&& request,
LOG(INFO) << "Send request to storage " << host;
remoteFunc(client.get(), request.second)
.via(evb)
.then([pro = std::move(pro), host](folly::Try<Response>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
LOG(ERROR) << "Send request to " << host << " failed";
LOG(ERROR) << "RpcResponse exception: " << t.exception().what().c_str();
pro.setValue(std::make_pair(false, Response()));
.thenValue([pro = std::move(pro), this](Response&& resp) mutable {
auto& result = resp.get_result();
for (auto& part : result.get_failed_parts()) {
auto partId = part.get_part_id();
auto code = part.get_code();

LOG(ERROR) << "Failure! Failed part: " << partId
<< ", error code: " << static_cast<int32_t>(code);
pro.setValue(std::make_pair(::nebula::ErrorCode(static_cast<int32_t>(code)),
Response()));
return;
}
auto&& resp = t.value();
pro.setValue(std::make_pair(true, std::move(resp)));
pro.setValue(std::make_pair(::nebula::ErrorCode::SUCCEEDED, std::move(resp)));
return;
})
.thenError([pro = std::move(pro), host, this](
folly::exception_wrapper&& exWrapper) mutable {
using TransportException = apache::thrift::transport::TTransportException;
auto ex = exWrapper.get_exception<TransportException>();
if (ex) {
if (ex->getType() == TransportException::TIMED_OUT) {
LOG(ERROR) << "Request to " << host << " time out: " << ex->what();
} else {
LOG(ERROR) << "Request to " << host << " failed: " << ex->what();
}
} else {
LOG(ERROR) << "Request to " << host << " failed.";
}
pro.setValue(std::make_pair(::nebula::ErrorCode::E_RPC_FAILURE, Response()));
return;
});
}); // via
}
Expand Down
Loading

0 comments on commit 4fd300b

Please sign in to comment.