Skip to content

Commit

Permalink
unify interface
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Sep 16, 2019
1 parent d10ce9d commit 12726f5
Show file tree
Hide file tree
Showing 19 changed files with 74 additions and 71 deletions.
6 changes: 2 additions & 4 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,8 @@ struct VertexData {
}

struct ResponseCommon {
// For those operations that affect multiple partitions, partition_codes only contains result of the partitions
// which return error. As for operations that only affect single partition (e.g. AdminExecResp), partition_codes
// only contains one result of the target partition
1: required list<ResultCode> partition_codes,
// Only contains the partition that returns error
1: required list<ResultCode> failed_codes,
// Query latency from storage service
2: required i32 latency_in_us,
}
Expand Down
37 changes: 20 additions & 17 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,13 @@ folly::Future<Status> AdminClient::getResponse(
folly::Promise<Status> pro;
auto f = pro.getFuture();
auto* evb = ioThreadPool_->getEventBase();
folly::via(evb, [evb, pro = std::move(pro), host, req = std::move(req),
auto partId = req.get_part_id();
folly::via(evb, [evb, pro = std::move(pro), host, req = std::move(req), partId,
remoteFunc = std::move(remoteFunc), respGen = std::move(respGen),
this] () mutable {
auto client = clientsMan_->client(host, evb);
remoteFunc(client, std::move(req))
.then(evb, [p = std::move(pro), respGen = std::move(respGen)](
.then(evb, [p = std::move(pro), partId, respGen = std::move(respGen)](
folly::Try<storage::cpp2::AdminExecResp>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
Expand All @@ -207,11 +208,15 @@ folly::Future<Status> AdminClient::getResponse(
return;
}
auto&& result = std::move(t).value().get_result();
if (result.get_partition_codes().empty()) {
p.setValue(Status::Error("Illegal response"));
if (result.get_failed_codes().empty()) {
storage::cpp2::ResultCode resultCode;
resultCode.set_code(storage::cpp2::ErrorCode::SUCCEEDED);
resultCode.set_part_id(partId);
p.setValue(respGen(resultCode));
} else {
auto resp = result.get_failed_codes().front();
p.setValue(respGen(std::move(resp)));
}
auto resp = result.get_partition_codes().front();
p.setValue(respGen(std::move(resp)));
});
});
return f;
Expand All @@ -229,13 +234,14 @@ void AdminClient::getResponse(
auto* evb = ioThreadPool_->getEventBase();
CHECK_GE(index, 0);
CHECK_LT(index, hosts.size());
folly::via(evb, [evb, hosts = std::move(hosts), index, req = std::move(req),
auto partId = req.get_part_id();
folly::via(evb, [evb, hosts = std::move(hosts), index, req = std::move(req), partId,
remoteFunc = std::move(remoteFunc), retry, pro = std::move(pro),
retryLimit, this] () mutable {
auto client = clientsMan_->client(hosts[index], evb);
remoteFunc(client, req)
.then(evb, [p = std::move(pro), hosts = std::move(hosts), index, req = std::move(req),
remoteFunc = std::move(remoteFunc), retry, retryLimit,
partId, remoteFunc = std::move(remoteFunc), retry, retryLimit,
this] (folly::Try<storage::cpp2::AdminExecResp>&& t) mutable {
// exception occurred during RPC
if (t.hasException()) {
Expand All @@ -244,7 +250,7 @@ void AdminClient::getResponse(
<< ", retry " << retry
<< ", limit " << retryLimit;
getResponse(std::move(hosts),
index + 1,
(index + 1) % hosts.size(),
std::move(req),
remoteFunc,
retry + 1,
Expand All @@ -257,15 +263,12 @@ void AdminClient::getResponse(
return;
}
auto&& result = std::move(t).value().get_result();
if (result.get_partition_codes().empty()) {
p.setValue(Status::Error("Illegal response"));
if (result.get_failed_codes().empty()) {
p.setValue(Status::OK());
return;
}
auto resp = result.get_partition_codes().front();
auto resp = result.get_failed_codes().front();
switch (resp.get_code()) {
case storage::cpp2::ErrorCode::SUCCEEDED: {
p.setValue(Status::OK());
return;
}
case storage::cpp2::ErrorCode::E_LEADER_CHANGED: {
if (retry < retryLimit && resp.get_leader() != nullptr) {
HostAddr leader(resp.get_leader()->get_ip(), resp.get_leader()->get_port());
Expand Down Expand Up @@ -299,7 +302,7 @@ void AdminClient::getResponse(
<< ", retry " << retry
<< ", limit " << retryLimit;
getResponse(std::move(hosts),
index + 1,
(index + 1) % hosts.size(),
std::move(req),
std::move(remoteFunc),
retry + 1,
Expand Down
7 changes: 2 additions & 5 deletions src/meta/test/AdminClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
storage::cpp2::AdminExecResp resp; \
storage::cpp2::ResponseCommon result; \
std::vector<storage::cpp2::ResultCode> partRetCode; \
storage::cpp2::ResultCode thriftRet; \
thriftRet.set_code(storage::cpp2::ErrorCode::SUCCEEDED); \
partRetCode.emplace_back(std::move(thriftRet)); \
result.set_partition_codes(partRetCode); \
result.set_failed_codes(partRetCode); \
resp.set_result(result); \
pro.setValue(std::move(resp)); \
return f; \
Expand All @@ -42,7 +39,7 @@
thriftRet.set_code(storage::cpp2::ErrorCode::E_LEADER_CHANGED); \
thriftRet.set_leader(leader); \
partRetCode.emplace_back(std::move(thriftRet)); \
result.set_partition_codes(partRetCode); \
result.set_failed_codes(partRetCode); \
resp.set_result(result); \
pro.setValue(std::move(resp)); \
return f; \
Expand Down
26 changes: 15 additions & 11 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ class BaseProcessor {
if (this->stats_ != nullptr) {
stats::StatsManager::addValue(this->stats_->latencyStatId_,
this->duration_.elapsedInUSec());
if (this->result_.get_partition_codes().empty()) {
if (this->result_.get_failed_codes().empty()) {
stats::StatsManager::addValue(this->stats_->qpsStatId_, 1);
} else {
stats::StatsManager::addValue(this->stats_->errorQpsStatId_, 1);
}
}
this->result_.set_latency_in_us(this->duration_.elapsedInUSec());
this->result_.set_partition_codes(this->codes_);
this->result_.set_failed_codes(this->codes_);
this->resp_.set_result(std::move(this->result_));
this->promise_.setValue(std::move(this->resp_));
delete this;
Expand All @@ -78,18 +78,22 @@ class BaseProcessor {
cpp2::ErrorCode to(kvstore::ResultCode code);

void pushResultCode(cpp2::ErrorCode code, PartitionID partId) {
cpp2::ResultCode thriftRet;
thriftRet.set_code(code);
thriftRet.set_part_id(partId);
codes_.emplace_back(std::move(thriftRet));
if (code != cpp2::ErrorCode::SUCCEEDED) {
cpp2::ResultCode thriftRet;
thriftRet.set_code(code);
thriftRet.set_part_id(partId);
codes_.emplace_back(std::move(thriftRet));
}
}

void pushResultCode(cpp2::ErrorCode code, PartitionID partId, HostAddr leader) {
cpp2::ResultCode thriftRet;
thriftRet.set_code(code);
thriftRet.set_part_id(partId);
thriftRet.set_leader(toThriftHost(leader));
codes_.emplace_back(std::move(thriftRet));
if (code != cpp2::ErrorCode::SUCCEEDED) {
cpp2::ResultCode thriftRet;
thriftRet.set_code(code);
thriftRet.set_part_id(partId);
thriftRet.set_leader(toThriftHost(leader));
codes_.emplace_back(std::move(thriftRet));
}
}

nebula::cpp2::HostAddr toThriftHost(const HostAddr& host) {
Expand Down
6 changes: 3 additions & 3 deletions src/storage/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ void BaseProcessor<RESP>::doPut(GraphSpaceID spaceId,
}
this->callingNum_--;
if (this->callingNum_ == 0) {
result_.set_partition_codes(std::move(this->codes_));
result_.set_failed_codes(std::move(this->codes_));
finished = true;
}
}
Expand Down Expand Up @@ -97,7 +97,7 @@ void BaseProcessor<RESP>::doRemove(GraphSpaceID spaceId,
}
this->callingNum_--;
if (this->callingNum_ == 0) {
result_.set_partition_codes(std::move(this->codes_));
result_.set_failed_codes(std::move(this->codes_));
finished = true;
}
}
Expand Down Expand Up @@ -136,7 +136,7 @@ void BaseProcessor<RESP>::doRemoveRange(GraphSpaceID spaceId,
}
this->callingNum_--;
if (this->callingNum_ == 0) {
// qwer: set_partition_codes?
// qwer: set_failed_codes?
finished = true;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/DeleteVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void DeleteVertexProcessor::process(const cpp2::DeleteVertexRequest& req) {
if (thriftResult.code != cpp2::ErrorCode::SUCCEEDED) {
this->codes_.emplace_back(std::move(thriftResult));
}
result_.set_partition_codes(std::move(this->codes_));
result_.set_failed_codes(std::move(this->codes_));
}
this->onFinished();
});
Expand Down
4 changes: 1 addition & 3 deletions src/storage/QueryEdgePropsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ void QueryEdgePropsProcessor::process(const cpp2::EdgePropRequest& req) {
}
}
// TODO handle failures
if (ret != kvstore::ResultCode::SUCCEEDED) {
this->pushResultCode(this->to(ret), partId);
}
this->pushResultCode(this->to(ret), partId);
});
resp_.set_data(std::move(rsWriter.data()));

Expand Down
4 changes: 2 additions & 2 deletions src/storage/client/StorageClient.inl
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ folly::SemiFuture<StorageRpcResponse<Response>> StorageClient::collectResponse(
auto resp = std::move(val.value());
auto& result = resp.get_result();
bool hasFailure{false};
for (auto& code : result.get_partition_codes()) {
for (auto& code : result.get_failed_codes()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id()
<< ", failed code " << static_cast<int32_t>(code.get_code());
hasFailure = true;
Expand Down Expand Up @@ -187,7 +187,7 @@ folly::Future<StatusOr<Response>> StorageClient::getResponse(
auto&& resp = std::move(t.value());
// leader changed
auto& result = resp.get_result();
for (auto& code : result.get_partition_codes()) {
for (auto& code : result.get_failed_codes()) {
VLOG(3) << "Failure! Failed part " << code.get_part_id()
<< ", failed code " << static_cast<int32_t>(code.get_code());
if (code.get_code() == storage::cpp2::ErrorCode::E_LEADER_CHANGED) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/test/AddEdgesTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ TEST(AddEdgesTest, SimpleTest) {
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());

LOG(INFO) << "Check data in kv store...";
for (auto partId = 0; partId < 3; partId++) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/test/AddVerticesTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TEST(AddVerticesTest, SimpleTest) {
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());

LOG(INFO) << "Check data in kv store...";
for (auto partId = 0; partId < 3; partId++) {
Expand Down
6 changes: 3 additions & 3 deletions src/storage/test/DeleteEdgesTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TEST(DeleteEdgesTest, SimpleTest) {
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());
}
// Add multi version edges
{
Expand Down Expand Up @@ -76,7 +76,7 @@ TEST(DeleteEdgesTest, SimpleTest) {
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());
}

LOG(INFO) << "Check data in kv store...";
Expand Down Expand Up @@ -119,7 +119,7 @@ TEST(DeleteEdgesTest, SimpleTest) {
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());
}

LOG(INFO) << "Check data again in kv store...";
Expand Down
4 changes: 2 additions & 2 deletions src/storage/test/DeleteVertexTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ TEST(DeleteVertexTest, SimpleTest) {
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());

LOG(INFO) << "Check data in kv store...";
for (auto partId = 0; partId < 3; partId++) {
Expand Down Expand Up @@ -83,7 +83,7 @@ TEST(DeleteVertexTest, SimpleTest) {
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/storage/test/QueryBoundTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ void checkResponse(cpp2::QueryResponse& resp,
int32_t dstIdFrom,
int32_t edgeNum,
bool outBound) {
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());

EXPECT_EQ(vertexNum, resp.vertices.size());

Expand Down Expand Up @@ -445,9 +445,9 @@ TEST(QueryBoundTest, FilterTest_InvalidFilter) {
auto resp = std::move(f).get();

LOG(INFO) << "Check the results...";
EXPECT_EQ(3, resp.result.partition_codes.size());
EXPECT_EQ(3, resp.result.failed_codes.size());
EXPECT_TRUE(nebula::storage::cpp2::ErrorCode::E_INVALID_FILTER
== resp.result.partition_codes[0].code);
== resp.result.failed_codes[0].code);
}

TEST(QueryBoundTest, MultiEdgeQueryTest) {
Expand All @@ -464,7 +464,10 @@ TEST(QueryBoundTest, MultiEdgeQueryTest) {

LOG(INFO) << "Test QueryOutBoundRequest...";
auto executor = std::make_unique<folly::CPUThreadPoolExecutor>(3);
auto* processor = QueryBoundProcessor::instance(kv.get(), schemaMan.get(), nullptr, executor.get());
auto* processor = QueryBoundProcessor::instance(kv.get(),
schemaMan.get(),
nullptr,
executor.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
Expand Down
4 changes: 2 additions & 2 deletions src/storage/test/QueryEdgeKeysTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TEST(QueryEdgeKeysTest, SimpleTest) {
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());
}

LOG(INFO) << "Check data in kv store...";
Expand Down Expand Up @@ -80,7 +80,7 @@ TEST(QueryEdgeKeysTest, SimpleTest) {
auto fut = processor->getFuture();
processor->process(req);
auto resp = std::move(fut).get();
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());
CHECK_EQ(1, resp.edge_keys.size());
auto edge = resp.edge_keys[0];
CHECK_EQ(srcId, edge.get_src());
Expand Down
2 changes: 1 addition & 1 deletion src/storage/test/QueryEdgePropsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void buildRequest(cpp2::EdgePropRequest& req) {


void checkResponse(cpp2::EdgePropResponse& resp) {
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());
EXPECT_EQ(13, resp.schema.columns.size());
auto provider = std::make_shared<ResultSchemaProvider>(resp.schema);
LOG(INFO) << "Check edge props...";
Expand Down
2 changes: 1 addition & 1 deletion src/storage/test/QueryStatsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ void buildRequest(cpp2::GetNeighborsRequest& req) {


void checkResponse(const cpp2::QueryStatsResponse& resp) {
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());

EXPECT_EQ(7, resp.schema.columns.size());
CHECK_GT(resp.data.size(), 0);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/test/QueryVertexPropsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ TEST(QueryVertexPropsTest, SimpleTest) {
auto resp = std::move(f).get();

LOG(INFO) << "Check the results...";
EXPECT_EQ(0, resp.result.partition_codes.size());
EXPECT_EQ(0, resp.result.failed_codes.size());

EXPECT_EQ(30, resp.vertices.size());

Expand Down
Loading

0 comments on commit 12726f5

Please sign in to comment.