Skip to content

Commit

Permalink
Remove extra field (#3427)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
Shylock-Hg and Sophie-Xie authored Dec 15, 2021
1 parent 9fbe164 commit 99f1f7a
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 31 deletions.
1 change: 0 additions & 1 deletion src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,6 @@ StorageClientBase<ClientType>::getHostPartsWithCursor(GraphSpaceID spaceId) cons

// TODO support cursor
cpp2::ScanCursor c;
c.set_has_next(false);
auto parts = status.value();
for (auto partId = 1; partId <= parts; partId++) {
auto leader = getLeader(spaceId, partId);
Expand Down
3 changes: 1 addition & 2 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,8 @@ struct LookupAndTraverseRequest {
*/

struct ScanCursor {
3: bool has_next,
// next start key of scan, only valid when has_next is true
4: optional binary next_cursor,
1: optional binary next_cursor,
}

struct ScanVertexRequest {
Expand Down
6 changes: 0 additions & 6 deletions src/storage/exec/ScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ class ScanVertexPropNode : public QueryNode<Cursor> {

cpp2::ScanCursor c;
if (iter->valid()) {
c.set_has_next(true);
c.set_next_cursor(iter->key().str());
} else {
c.set_has_next(false);
}
cursors_->emplace(partId, std::move(c));
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down Expand Up @@ -246,10 +243,7 @@ class ScanEdgePropNode : public QueryNode<Cursor> {

cpp2::ScanCursor c;
if (iter->valid()) {
c.set_has_next(true);
c.set_next_cursor(iter->key().str());
} else {
c.set_has_next(false);
}
cursors_->emplace(partId, std::move(c));
return nebula::cpp2::ErrorCode::SUCCEEDED;
Expand Down
16 changes: 9 additions & 7 deletions src/storage/query/ScanEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ void ScanEdgeProcessor::runInSingleThread(const cpp2::ScanEdgeRequest& req) {
auto partId = partEntry.first;
auto cursor = partEntry.second;

auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : "");
auto ret = plan.go(
partId, cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "");
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED &&
failedParts.find(partId) == failedParts.end()) {
failedParts.emplace(partId);
Expand All @@ -158,12 +159,13 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) {
size_t i = 0;
std::vector<folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>>> futures;
for (const auto& [partId, cursor] : req.get_parts()) {
futures.emplace_back(runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.get_has_next() ? *cursor.get_next_cursor() : "",
&expCtxs_[i]));
futures.emplace_back(
runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "",
&expCtxs_[i]));
i++;
}

Expand Down
16 changes: 9 additions & 7 deletions src/storage/query/ScanVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ void ScanVertexProcessor::runInSingleThread(const cpp2::ScanVertexRequest& req)
auto partId = partEntry.first;
auto cursor = partEntry.second;

auto ret = plan.go(partId, cursor.get_has_next() ? *cursor.get_next_cursor() : "");
auto ret = plan.go(
partId, cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "");
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED &&
failedParts.find(partId) == failedParts.end()) {
failedParts.emplace(partId);
Expand All @@ -162,12 +163,13 @@ void ScanVertexProcessor::runInMultipleThread(const cpp2::ScanVertexRequest& req
size_t i = 0;
std::vector<folly::Future<std::pair<nebula::cpp2::ErrorCode, PartitionID>>> futures;
for (const auto& [partId, cursor] : req.get_parts()) {
futures.emplace_back(runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.get_has_next() ? *cursor.get_next_cursor() : "",
&expCtxs_[i]));
futures.emplace_back(
runInExecutor(&contexts_[i],
&results_[i],
&cursorsOfPart_[i],
partId,
cursor.next_cursor_ref().has_value() ? cursor.next_cursor_ref().value() : "",
&expCtxs_[i]));
i++;
}

Expand Down
9 changes: 5 additions & 4 deletions src/storage/test/ScanEdgeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ cpp2::ScanEdgeRequest buildRequest(
CHECK_EQ(partIds.size(), cursors.size());
std::unordered_map<PartitionID, cpp2::ScanCursor> parts;
for (std::size_t i = 0; i < partIds.size(); ++i) {
c.set_has_next(!cursors[i].empty());
c.set_next_cursor(cursors[i]);
if (!cursors[i].empty()) {
c.set_next_cursor(cursors[i]);
}
parts.emplace(partIds[i], c);
}
req.set_parts(std::move(parts));
Expand Down Expand Up @@ -168,7 +169,7 @@ TEST(ScanEdgeTest, CursorTest) {

ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(*resp.props_ref(), edge, edge.second.size(), totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand All @@ -195,7 +196,7 @@ TEST(ScanEdgeTest, CursorTest) {

ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(*resp.props_ref(), edge, edge.second.size(), totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref().has_value());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand Down
9 changes: 5 additions & 4 deletions src/storage/test/ScanVertexTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ cpp2::ScanVertexRequest buildRequest(
CHECK_EQ(partIds.size(), cursors.size());
std::unordered_map<PartitionID, cpp2::ScanCursor> parts;
for (std::size_t i = 0; i < partIds.size(); ++i) {
c.set_has_next(!cursors[i].empty());
c.set_next_cursor(cursors[i]);
if (!cursors[i].empty()) {
c.set_next_cursor(cursors[i]);
}
parts.emplace(partIds[i], c);
}
req.set_parts(std::move(parts));
Expand Down Expand Up @@ -183,7 +184,7 @@ TEST(ScanVertexTest, CursorTest) {

ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(*resp.props_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand All @@ -209,7 +210,7 @@ TEST(ScanVertexTest, CursorTest) {

ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(*resp.props_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount);
hasNext = resp.get_cursors().at(partId).get_has_next();
hasNext = resp.get_cursors().at(partId).next_cursor_ref().has_value();
if (hasNext) {
CHECK(resp.get_cursors().at(partId).next_cursor_ref());
cursor = *resp.get_cursors().at(partId).next_cursor_ref();
Expand Down

0 comments on commit 99f1f7a

Please sign in to comment.