Skip to content

Commit

Permalink
make scan interface return error code
Browse files Browse the repository at this point in the history
  • Loading branch information
jievince committed Apr 10, 2023
1 parent 6189a41 commit 2d9602c
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 71 deletions.
11 changes: 7 additions & 4 deletions examples/StorageClientExample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <nebula/sclient/ScanEdgeIter.h>
#include <common/Init.h>
#include <nebula/sclient/StorageClient.h>
#include "common/response/Response.h"

int main(int argc, char* argv[]) {
nebula::init(&argc, &argv);
Expand All @@ -30,8 +31,9 @@ int main(int argc, char* argv[]) {
std::cout << "scan edge..." << std::endl;
while (scanEdgeIter.hasNext()) {
std::cout << "-------------------------" << std::endl;
nebula::DataSet ds = scanEdgeIter.next();
std::cout << ds << std::endl;
std::pair<nebula::ErrorCode, nebula::DataSet> res = scanEdgeIter.next();
std::cout << res.first << std::endl;
std::cout << res.second << std::endl;
std::cout << "+++++++++++++++++++++++++" << std::endl;
}

Expand All @@ -47,8 +49,9 @@ int main(int argc, char* argv[]) {
std::cout << "scan vertex..." << std::endl;
while (scanVertexIter.hasNext()) {
std::cout << "-------------------------" << std::endl;
nebula::DataSet ds = scanVertexIter.next();
std::cout << ds << std::endl;
std::pair<nebula::ErrorCode, nebula::DataSet> res = scanVertexIter.next();
std::cout << res.first << std::endl;
std::cout << res.second << std::endl;
std::cout << "+++++++++++++++++++++++++" << std::endl;
}

Expand Down
3 changes: 2 additions & 1 deletion include/nebula/sclient/ScanEdgeIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <vector>

#include "common/datatypes/DataSet.h"
#include "common/graph/Response.h"

namespace nebula {
class StorageClient;
Expand All @@ -26,7 +27,7 @@ struct ScanEdgeIter {

bool hasNext();

DataSet next();
std::pair<::nebula::ErrorCode, DataSet> next();

StorageClient* client_;
storage::cpp2::ScanEdgeRequest* req_;
Expand Down
3 changes: 2 additions & 1 deletion include/nebula/sclient/ScanVertexIter.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <vector>

#include "common/datatypes/DataSet.h"
#include "common/graph/Response.h"

namespace nebula {
class StorageClient;
Expand All @@ -26,7 +27,7 @@ struct ScanVertexIter {

bool hasNext();

DataSet next();
std::pair<::nebula::ErrorCode, DataSet> next();

StorageClient* client_;
storage::cpp2::ScanVertexRequest* req_;
Expand Down
9 changes: 5 additions & 4 deletions include/nebula/sclient/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "nebula/sclient/SConfig.h"
#include "nebula/sclient/ScanEdgeIter.h"
#include "nebula/sclient/ScanVertexIter.h"
#include "common/graph/Response.h"

namespace folly {
class IOThreadPoolExecutor;
Expand Down Expand Up @@ -98,16 +99,16 @@ class StorageClient {
}

private:
std::pair<bool, storage::cpp2::ScanResponse> doScanEdge(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> doScanEdge(
const storage::cpp2::ScanEdgeRequest& req);

std::pair<bool, storage::cpp2::ScanResponse> doScanVertex(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> doScanVertex(
const storage::cpp2::ScanVertexRequest& req);

template <typename Request, typename RemoteFunc, typename Response>
void getResponse(std::pair<HostAddr, Request>&& request,
void getResponse(std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<std::pair<bool, Response>> pro);
folly::Promise<std::pair<::nebula::ErrorCode, Response>> pro);

std::unique_ptr<MetaClient> mClient_;
SConfig sConfig_;
Expand Down
23 changes: 8 additions & 15 deletions src/sclient/ScanEdgeIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,22 @@ ScanEdgeIter::~ScanEdgeIter() {
delete req_;
}

DataSet ScanEdgeIter::next() {
if (!hasNext()) {
LOG(ERROR) << "hasNext() == false !";
return DataSet();
}
std::pair<::nebula::ErrorCode, DataSet> ScanEdgeIter::next() {
DCHECK(hasNext()) << "hasNext() == false !";
DCHECK(!!req_);
auto partCursorMapReq = req_->get_parts();
DCHECK_EQ(partCursorMapReq.size(), 1);
partCursorMapReq.begin()->second.set_next_cursor(nextCursor_);
req_->set_parts(partCursorMapReq);
auto r = client_->doScanEdge(*req_);
if (!r.first) {
LOG(ERROR) << "Scan edge failed";
auto code = r.first;
if (code != ::nebula::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Scan edge failed, error code: " << static_cast<int32_t>(code);
this->hasNext_ = false;
return DataSet();
return {code, DataSet()};
}
auto scanResponse = r.second;
if (!scanResponse.get_result().get_failed_parts().empty()) {
auto errorCode = scanResponse.get_result().get_failed_parts()[0].code;
LOG(ERROR) << "Scan edge failed, errorcode: " << static_cast<int32_t>(errorCode);
this->hasNext_ = false;
return DataSet();
}
DCHECK(scanResponse.get_result().get_failed_parts().empty());
auto partCursorMapResp = scanResponse.get_cursors();
DCHECK_EQ(partCursorMapResp.size(), 1);
auto scanCursor = partCursorMapResp.begin()->second;
Expand All @@ -52,7 +45,7 @@ DataSet ScanEdgeIter::next() {
nextCursor_ = scanCursor.next_cursor_ref().value();
}

return *scanResponse.get_props();
return {::nebula::ErrorCode::SUCCEEDED, *scanResponse.get_props()};
}

} // namespace nebula
23 changes: 8 additions & 15 deletions src/sclient/ScanVertexIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,22 @@ ScanVertexIter::~ScanVertexIter() {
delete req_;
}

DataSet ScanVertexIter::next() {
if (!hasNext()) {
LOG(ERROR) << "hasNext() == false !";
return DataSet();
}
std::pair<::nebula::ErrorCode, DataSet> ScanVertexIter::next() {
DCHECK(hasNext()) << "hasNext() == false !";
DCHECK(!!req_);
auto partCursorMapReq = req_->get_parts();
DCHECK_EQ(partCursorMapReq.size(), 1);
partCursorMapReq.begin()->second.set_next_cursor(nextCursor_);
req_->set_parts(partCursorMapReq);
auto r = client_->doScanVertex(*req_);
if (!r.first) {
LOG(ERROR) << "Scan vertex failed";
auto code = r.first;
if (code != ::nebula::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Scan vertex failed, error code: " << static_cast<int32_t>(code);
this->hasNext_ = false;
return DataSet();
return {code, DataSet()};
}
auto scanResponse = r.second;
if (!scanResponse.get_result().get_failed_parts().empty()) {
auto errorCode = scanResponse.get_result().get_failed_parts()[0].code;
LOG(ERROR) << "Scan vertex failed, errorcode: " << static_cast<int32_t>(errorCode);
this->hasNext_ = false;
return DataSet();
}
DCHECK(scanResponse.get_result().get_failed_parts().empty());
auto partCursorMapResp = scanResponse.get_cursors();
DCHECK_EQ(partCursorMapResp.size(), 1);
auto scanCursor = partCursorMapResp.begin()->second;
Expand All @@ -54,7 +47,7 @@ DataSet ScanVertexIter::next() {
nextCursor_ = scanCursor.next_cursor_ref().value();
}

return *scanResponse.get_props();
return {::nebula::ErrorCode::SUCCEEDED, *scanResponse.get_props()};
}

} // namespace nebula
50 changes: 35 additions & 15 deletions src/sclient/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,20 @@ ScanEdgeIter StorageClient::scanEdgeWithPart(std::string spaceName,
return {this, req};
}

std::pair<bool, storage::cpp2::ScanResponse> StorageClient::doScanEdge(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> StorageClient::doScanEdge(
const storage::cpp2::ScanEdgeRequest& req) {
std::pair<HostAddr, storage::cpp2::ScanEdgeRequest> request;
auto partCursorMap = req.get_parts();
DCHECK_EQ(partCursorMap.size(), 1);
PartitionID partId = partCursorMap.begin()->first;
auto host = mClient_->getPartLeaderFromCache(req.get_space_id(), partId);
if (!host.first) {
return {false, storage::cpp2::ScanResponse()};
return {::nebula::ErrorCode::E_UNKNOWN, storage::cpp2::ScanResponse()};
}
request.first = host.second;
request.second = req;

folly::Promise<std::pair<bool, storage::cpp2::ScanResponse>> promise;
folly::Promise<std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse>> promise;
auto future = promise.getFuture();
getResponse(
std::move(request),
Expand Down Expand Up @@ -159,20 +159,20 @@ ScanVertexIter StorageClient::scanVertexWithPart(
return {this, req};
}

std::pair<bool, storage::cpp2::ScanResponse> StorageClient::doScanVertex(
std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse> StorageClient::doScanVertex(
const storage::cpp2::ScanVertexRequest& req) {
std::pair<HostAddr, storage::cpp2::ScanVertexRequest> request;
auto partCursorMap = req.get_parts();
DCHECK_EQ(partCursorMap.size(), 1);
PartitionID partId = partCursorMap.begin()->first;
auto host = mClient_->getPartLeaderFromCache(req.get_space_id(), partId);
if (!host.first) {
return {false, storage::cpp2::ScanResponse()};
return {::nebula::ErrorCode::E_UNKNOWN, storage::cpp2::ScanResponse()};
}
request.first = host.second;
request.second = req;

folly::Promise<std::pair<bool, storage::cpp2::ScanResponse>> promise;
folly::Promise<std::pair<::nebula::ErrorCode, storage::cpp2::ScanResponse>> promise;
auto future = promise.getFuture();
getResponse(
std::move(request),
Expand All @@ -185,7 +185,7 @@ std::pair<bool, storage::cpp2::ScanResponse> StorageClient::doScanVertex(
template <typename Request, typename RemoteFunc, typename Response>
void StorageClient::getResponse(std::pair<HostAddr, Request>&& request,
RemoteFunc&& remoteFunc,
folly::Promise<std::pair<bool, Response>> pro) {
folly::Promise<std::pair<::nebula::ErrorCode, Response>> pro) {
auto* evb = DCHECK_NOTNULL(ioExecutor_)->getEventBase();
folly::via(evb,
[evb,
Expand All @@ -198,16 +198,36 @@ void StorageClient::getResponse(std::pair<HostAddr, Request>&& request,
LOG(INFO) << "Send request to storage " << host;
remoteFunc(client.get(), request.second)
.via(evb)
.then([pro = std::move(pro), host](folly::Try<Response>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
LOG(ERROR) << "Send request to " << host << " failed";
LOG(ERROR) << "RpcResponse exception: " << t.exception().what().c_str();
pro.setValue(std::make_pair(false, Response()));
.thenValue([pro = std::move(pro), this](Response&& resp) mutable {
auto& result = resp.get_result();
for (auto& part : result.get_failed_parts()) {
auto partId = part.get_part_id();
auto code = part.get_code();

LOG(ERROR) << "Failure! Failed part: " << partId
<< ", error code: " << static_cast<int32_t>(code);
pro.setValue(std::make_pair(::nebula::ErrorCode(static_cast<int32_t>(code)),
Response()));
return;
}
auto&& resp = t.value();
pro.setValue(std::make_pair(true, std::move(resp)));
pro.setValue(std::make_pair(::nebula::ErrorCode::SUCCEEDED, std::move(resp)));
return;
})
.thenError([pro = std::move(pro), host, this](
folly::exception_wrapper&& exWrapper) mutable {
using TransportException = apache::thrift::transport::TTransportException;
auto ex = exWrapper.get_exception<TransportException>();
if (ex) {
if (ex->getType() == TransportException::TIMED_OUT) {
LOG(ERROR) << "Request to " << host << " time out: " << ex->what();
} else {
LOG(ERROR) << "Request to " << host << " failed: " << ex->what();
}
} else {
LOG(ERROR) << "Request to " << host << " failed.";
}
pro.setValue(std::make_pair(::nebula::ErrorCode::E_RPC_FAILURE, Response()));
return;
});
}); // via
}
Expand Down
19 changes: 11 additions & 8 deletions src/sclient/tests/StorageClientSSLTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ class StorageClientTest : public SClientTest {
nebula::DataSet got;
int scanNum = 0;
while (goodScanIter.hasNext()) {
nebula::DataSet ds = goodScanIter.next();
got.append(std::move(ds));
std::pair<nebula::ErrorCode, nebula::DataSet> res = goodScanIter.next();
EXPECT_EQ(res.first, nebula::ErrorCode::SUCCEEDED);
got.append(std::move(res.second));
++scanNum;
}
EXPECT_EQ(scanNum, 3);
Expand All @@ -130,8 +131,9 @@ class StorageClientTest : public SClientTest {
true);
{
EXPECT_EQ(badScanIter.hasNext(), true);
nebula::DataSet ds = badScanIter.next();
EXPECT_EQ(ds, nebula::DataSet());
std::pair<nebula::ErrorCode, nebula::DataSet> res = badScanIter.next();
EXPECT_NE(res.first, nebula::ErrorCode::SUCCEEDED);
EXPECT_EQ(res.second, nebula::DataSet());
EXPECT_EQ(badScanIter.hasNext_, false);
EXPECT_EQ(badScanIter.nextCursor_, "");
}
Expand Down Expand Up @@ -162,8 +164,8 @@ class StorageClientTest : public SClientTest {
nebula::DataSet got;
int scanNum = 0;
while (goodScanIter.hasNext()) {
nebula::DataSet ds = goodScanIter.next();
got.append(std::move(ds));
std::pair<nebula::ErrorCode, nebula::DataSet> res = goodScanIter.next();
got.append(std::move(res.second));
++scanNum;
}
EXPECT_EQ(scanNum, 3);
Expand All @@ -183,8 +185,9 @@ class StorageClientTest : public SClientTest {
true);
{
EXPECT_EQ(badScanIter.hasNext(), true);
nebula::DataSet ds = badScanIter.next();
EXPECT_EQ(ds, nebula::DataSet());
std::pair<nebula::ErrorCode, nebula::DataSet> res = badScanIter.next();
EXPECT_NE(res.first, nebula::ErrorCode::SUCCEEDED);
EXPECT_EQ(res.second, nebula::DataSet());
EXPECT_EQ(badScanIter.hasNext_, false);
EXPECT_EQ(badScanIter.nextCursor_, "");
}
Expand Down
20 changes: 12 additions & 8 deletions src/sclient/tests/StorageClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ class StorageClientTest : public SClientTest {
nebula::DataSet got;
int scanNum = 0;
while (goodScanIter.hasNext()) {
nebula::DataSet ds = goodScanIter.next();
got.append(std::move(ds));
std::pair<nebula::ErrorCode, nebula::DataSet> res = goodScanIter.next();
EXPECT_EQ(res.first, nebula::ErrorCode::SUCCEEDED);
got.append(std::move(res.second));
++scanNum;
}
EXPECT_EQ(scanNum, 3);
Expand All @@ -130,8 +131,9 @@ class StorageClientTest : public SClientTest {
true);
{
EXPECT_EQ(badScanIter.hasNext(), true);
nebula::DataSet ds = badScanIter.next();
EXPECT_EQ(ds, nebula::DataSet());
std::pair<nebula::ErrorCode, nebula::DataSet> res = badScanIter.next();
EXPECT_NE(res.first, nebula::ErrorCode::SUCCEEDED);
EXPECT_EQ(res.second, nebula::DataSet());
EXPECT_EQ(badScanIter.hasNext_, false);
EXPECT_EQ(badScanIter.nextCursor_, "");
}
Expand Down Expand Up @@ -162,8 +164,9 @@ class StorageClientTest : public SClientTest {
nebula::DataSet got;
int scanNum = 0;
while (goodScanIter.hasNext()) {
nebula::DataSet ds = goodScanIter.next();
got.append(std::move(ds));
std::pair<nebula::ErrorCode, nebula::DataSet> res = goodScanIter.next();
EXPECT_EQ(res.first, nebula::ErrorCode::SUCCEEDED);
got.append(std::move(res.second));
++scanNum;
}
EXPECT_EQ(scanNum, 3);
Expand All @@ -183,8 +186,9 @@ class StorageClientTest : public SClientTest {
true);
{
EXPECT_EQ(badScanIter.hasNext(), true);
nebula::DataSet ds = badScanIter.next();
EXPECT_EQ(ds, nebula::DataSet());
std::pair<nebula::ErrorCode, nebula::DataSet> res = badScanIter.next();
EXPECT_NE(res.first, nebula::ErrorCode::SUCCEEDED);
EXPECT_EQ(res.second, nebula::DataSet());
EXPECT_EQ(badScanIter.hasNext_, false);
EXPECT_EQ(badScanIter.nextCursor_, "");
}
Expand Down

0 comments on commit 2d9602c

Please sign in to comment.