Skip to content

Commit

Permalink
fix scan vertex/edge do not handle ttl (#4578)
Browse files Browse the repository at this point in the history
* fix scan vertex/edge do not handle ttl

* use ErrorCode to unify community version and end version
  • Loading branch information
critical27 authored Aug 26, 2022
1 parent ae817c0 commit d575380
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 5 deletions.
30 changes: 25 additions & 5 deletions src/storage/exec/ScanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
}
auto vertexId = NebulaKeyUtils::getVertexId(vIdLen, key);
if (vertexId != currentVertexId && !currentVertexId.empty()) {
collectOneRow(isIntId, vIdLen, currentVertexId);
ret = collectOneRow(isIntId, vIdLen, currentVertexId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
} // collect vertex row
currentVertexId = vertexId;
if (static_cast<int64_t>(resultDataSet_->rowSize()) >= rowLimit) {
Expand All @@ -100,7 +103,10 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
tagNodes_[tagIdIndex->second]->doExecute(key.toString(), value.toString());
} // iterate key
if (static_cast<int64_t>(resultDataSet_->rowSize()) < rowLimit) {
collectOneRow(isIntId, vIdLen, currentVertexId);
ret = collectOneRow(isIntId, vIdLen, currentVertexId);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
}

cpp2::ScanCursor c;
Expand All @@ -111,7 +117,9 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void collectOneRow(bool isIntId, std::size_t vIdLen, const std::string& currentVertexId) {
nebula::cpp2::ErrorCode collectOneRow(bool isIntId,
std::size_t vIdLen,
const std::string& currentVertexId) {
List row;
nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED;
// vertexId is the first column
Expand Down Expand Up @@ -172,6 +180,7 @@ class ScanVertexPropNode : public QueryNode<Cursor> {
tagNode->clear();
}
}
return ret;
}

private:
Expand Down Expand Up @@ -251,7 +260,10 @@ class ScanEdgePropNode : public QueryNode<Cursor> {
}
auto value = iter->val();
edgeNodes_[edgeNodeIndex->second]->doExecute(key.toString(), value.toString());
collectOneRow(isIntId, vIdLen);
ret = collectOneRow(isIntId, vIdLen);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
}

cpp2::ScanCursor c;
Expand All @@ -262,9 +274,16 @@ class ScanEdgePropNode : public QueryNode<Cursor> {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void collectOneRow(bool isIntId, std::size_t vIdLen) {
nebula::cpp2::ErrorCode collectOneRow(bool isIntId, std::size_t vIdLen) {
List row;
nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED;
// Usually there is only one edge node, when all of the egdeNodes are invalid (e.g. ttl
// expired), just skip the row. If we don't skip it, there will be a whole line of empty value.
if (!std::any_of(edgeNodes_.begin(), edgeNodes_.end(), [](const auto& edgeNode) {
return edgeNode->valid();
})) {
return ret;
}
for (auto& edgeNode : edgeNodes_) {
ret = edgeNode->collectEdgePropsIfValid(
[&row, edgeNode = edgeNode.get(), this](
Expand Down Expand Up @@ -312,6 +331,7 @@ class ScanEdgePropNode : public QueryNode<Cursor> {
for (auto& edgeNode : edgeNodes_) {
edgeNode->clear();
}
return ret;
}

private:
Expand Down
1 change: 1 addition & 0 deletions src/storage/query/ScanEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ nebula::cpp2::ErrorCode ScanEdgeProcessor::checkAndBuildContexts(const cpp2::Sca
return nullptr;
}
});
buildEdgeTTLInfo();
return ret;
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/query/ScanVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ nebula::cpp2::ErrorCode ScanVertexProcessor::checkAndBuildContexts(
return nullptr;
}
});
buildTagTTLInfo();
return ret;
}

Expand Down
53 changes: 53 additions & 0 deletions src/storage/test/ScanEdgeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,59 @@ TEST(ScanEdgeTest, FilterTest) {
}
}

TEST(ScanEdgeTest, TtlTest) {
FLAGS_mock_ttl_col = true;

fs::TempDir rootPath("/tmp/GetNeighborsTest.XXXXXX");
mock::MockCluster cluster;
cluster.initStorageKV(rootPath.path());
auto* env = cluster.storageEnv_.get();
auto totalParts = cluster.getTotalParts();
ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts));
ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts));

EdgeType serve = 101;

{
LOG(INFO) << "Scan one edge with some properties in one batch";
size_t totalRowCount = 0;
auto edge = std::make_pair(
serve,
std::vector<std::string>{kSrc, kType, kRank, kDst, "teamName", "startYear", "endYear"});
for (PartitionID partId = 1; partId <= totalParts; partId++) {
auto req = buildRequest({partId}, {""}, {edge});
auto* processor = ScanEdgeProcessor::instance(env, nullptr);
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();

ASSERT_EQ(0, resp.result.failed_parts.size());
ASSERT_FALSE(resp.get_props()->rows.empty());
checkResponse(*resp.props_ref(), edge, edge.second.size(), totalRowCount);
}
CHECK_EQ(mock::MockData::serves_.size(), totalRowCount);
}
sleep(FLAGS_mock_ttl_duration + 1);
{
LOG(INFO) << "TTL expired, same request but no data returned";
auto edge = std::make_pair(
serve,
std::vector<std::string>{kSrc, kType, kRank, kDst, "teamName", "startYear", "endYear"});
for (PartitionID partId = 1; partId <= totalParts; partId++) {
auto req = buildRequest({partId}, {""}, {edge});
auto* processor = ScanEdgeProcessor::instance(env, nullptr);
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();

ASSERT_EQ(0, resp.result.failed_parts.size());
ASSERT_TRUE(resp.get_props()->rows.empty());
}
}

FLAGS_mock_ttl_col = false;
}

} // namespace storage
} // namespace nebula

Expand Down
50 changes: 50 additions & 0 deletions src/storage/test/ScanVertexTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,56 @@ TEST(ScanVertexTest, FilterTest) {
}
}

TEST(ScanVertexTest, TtlTest) {
FLAGS_mock_ttl_col = true;

fs::TempDir rootPath("/tmp/ScanVertexTest.XXXXXX");
mock::MockCluster cluster;
cluster.initStorageKV(rootPath.path());
auto* env = cluster.storageEnv_.get();
auto totalParts = cluster.getTotalParts();
ASSERT_EQ(true, QueryTestUtils::mockVertexData(env, totalParts));
ASSERT_EQ(true, QueryTestUtils::mockEdgeData(env, totalParts));

TagID player = 1;

{
LOG(INFO) << "Scan one tag with some properties in one batch";
size_t totalRowCount = 0;
auto tag =
std::make_pair(player, std::vector<std::string>{kVid, kTag, "name", "age", "avgScore"});
for (PartitionID partId = 1; partId <= totalParts; partId++) {
auto req = buildRequest({partId}, {""}, {tag});
auto* processor = ScanVertexProcessor::instance(env, nullptr);
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();

ASSERT_EQ(0, resp.result.failed_parts.size());
checkResponse(*resp.props_ref(), tag, tag.second.size() + 1 /* kVid */, totalRowCount);
}
CHECK_EQ(mock::MockData::players_.size(), totalRowCount);
}
sleep(FLAGS_mock_ttl_duration + 1);
{
LOG(INFO) << "TTL expired, same request but no data returned";
auto tag =
std::make_pair(player, std::vector<std::string>{kVid, kTag, "name", "age", "avgScore"});
for (PartitionID partId = 1; partId <= totalParts; partId++) {
auto req = buildRequest({partId}, {""}, {tag});
auto* processor = ScanVertexProcessor::instance(env, nullptr);
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();

ASSERT_EQ(0, resp.result.failed_parts.size());
ASSERT_TRUE(resp.get_props()->rows.empty());
}
}

FLAGS_mock_ttl_col = false;
}

} // namespace storage
} // namespace nebula

Expand Down

0 comments on commit d575380

Please sign in to comment.