Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lookup support push down topK #2992

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/clients/storage/GraphStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ StorageRpcRespFuture<cpp2::LookupIndexResp> GraphStorageClient::lookupIndex(
bool isEdge,
int32_t tagOrEdge,
const std::vector<std::string>& returnCols,
const std::vector<cpp2::OrderBy>& orderBy,
int64_t limit) {
// TODO(sky) : instead of isEdge and tagOrEdge to nebula::cpp2::SchemaID for graph layer.
auto space = param.space;
Expand Down Expand Up @@ -516,9 +517,13 @@ StorageRpcRespFuture<cpp2::LookupIndexResp> GraphStorageClient::lookupIndex(
cpp2::IndexSpec spec;
spec.set_contexts(contexts);
spec.set_schema_id(schemaId);

req.set_indices(spec);
req.set_common(common);
req.set_limit(limit);
if (!orderBy.empty()) {
req.set_order_by(orderBy);
}
}

return collectResponse(
Expand Down
3 changes: 2 additions & 1 deletion src/clients/storage/GraphStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy
bool isEdge,
int32_t tagOrEdge,
const std::vector<std::string>& returnCols,
int64_t limit);
const std::vector<cpp2::OrderBy>& orderBy = std::vector<cpp2::OrderBy>(),
int64_t limit = std::numeric_limits<int64_t>::max());

StorageRpcRespFuture<cpp2::GetNeighborsResponse> lookupAndTraverse(
const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec);
Expand Down
71 changes: 71 additions & 0 deletions src/common/utils/TopKHeap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef COMMON_UTILS_TOPKHEAP_H_
#define COMMON_UTILS_TOPKHEAP_H_

#include "common/base/Base.h"

namespace nebula {

template <class T>
class TopKHeap final {
public:
TopKHeap(int heapSize, std::function<bool(T, T)> comparator)
: heapSize_(heapSize), comparator_(std::move(comparator)) {
v_.reserve(heapSize);
}
~TopKHeap() = default;

void push(T data) {
if (v_.size() < static_cast<size_t>(heapSize_)) {
v_.push_back(data);
adjustUp(v_.size() - 1);
return;
}
if (comparator_(data, v_[0])) {
v_[0] = data;
adjustDown(0);
}
}

std::vector<T> moveTopK() { return std::move(v_); }

private:
void adjustDown(size_t parent) {
size_t child = parent * 2 + 1;
size_t size = v_.size();
while (child < size) {
if (child + 1 < size && comparator_(v_[child], v_[child + 1])) {
child += 1;
}
if (!comparator_(v_[parent], v_[child])) {
return;
}
std::swap(v_[parent], v_[child]);
parent = child;
child = parent * 2 + 1;
}
}
void adjustUp(size_t child) {
size_t parent = (child - 1) >> 1;
while (0 != child) {
if (!comparator_(v_[parent], v_[child])) {
return;
}
std::swap(v_[parent], v_[child]);
child = parent;
parent = (child - 1) >> 1;
}
}

private:
int heapSize_;
std::vector<T> v_;
std::function<bool(T, T)> comparator_;
};
} // namespace nebula
#endif // COMMON_UTILS_TOPKHEAP_H_
11 changes: 11 additions & 0 deletions src/common/utils/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,14 @@ nebula_add_test(
${PROXYGEN_LIBRARIES}
gtest
)

nebula_add_test(
NAME
topk_heap_test
SOURCES
TopKHeapTest.cpp
OBJECTS
$<TARGET_OBJECTS:base_obj>
LIBRARIES
gtest
)
50 changes: 50 additions & 0 deletions src/common/utils/test/TopKHeapTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include <gtest/gtest.h>

#include "common/utils/TopKHeap.h"

namespace nebula {

std::vector<int> getVector(std::function<bool(int, int)> comparator) {
TopKHeap<int> topKHeap(3, comparator);
topKHeap.push(3);
topKHeap.push(6);
topKHeap.push(1);
topKHeap.push(9);
topKHeap.push(2);
topKHeap.push(7);
return topKHeap.moveTopK();
}

TEST(TopKHeapTest, minHeapTest) {
auto topK = getVector([](const int& lhs, const int& rhs) { return lhs < rhs; });
std::stringstream ss;
for (auto& item : topK) {
ss << item << " ";
}
ASSERT_EQ(ss.str(), "3 2 1 ");
}

TEST(TopKHeapTest, maxHeapTest) {
auto topK = getVector([](const int& lhs, const int& rhs) { return lhs > rhs; });
std::stringstream ss;
for (auto& item : topK) {
ss << item << " ";
}
ASSERT_EQ(ss.str(), "6 7 9 ");
}

} // namespace nebula

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, true);
google::SetStderrLogging(google::INFO);

return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ struct LookupContext final : public AstContext {
YieldColumns* yieldExpr{nullptr};
std::vector<std::string> idxReturnCols;
std::vector<std::string> idxColNames;
std::unordered_map<std::string, std::string> idxOutColsToReturnColsMap_;
// order by
};

Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/query/IndexScanExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ folly::Future<Status> IndexScanExecutor::indexScan() {
lookup->isEdge(),
lookup->schemaId(),
lookup->returnColumns(),
lookup->orderBy(),
lookup->limit())
.via(runner())
.thenValue([this](StorageRpcResponse<LookupIndexResp> &&rpcResp) {
Expand Down
22 changes: 9 additions & 13 deletions src/graph/executor/query/TopNExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "graph/executor/query/TopNExecutor.h"

#include "common/time/ScopedTimer.h"
#include "common/utils/TopKHeap.h"
#include "graph/planner/plan/Query.h"

namespace nebula {
Expand All @@ -20,7 +21,7 @@ folly::Future<Status> TopNExecutor::execute() {
if (UNLIKELY(iter == nullptr)) {
return Status::Error("Internal error: nullptr iterator in topn executor");
}
if (UNLIKELY(!result.iter()->isSequentialIter())) {
if (UNLIKELY(!result.iter()->isSequentialIter() && !result.iter()->isPropIter())) {
std::stringstream ss;
ss << "Internal error: Sort executor does not supported " << iter->kind();
LOG(ERROR) << ss.str();
Expand Down Expand Up @@ -71,23 +72,18 @@ folly::Future<Status> TopNExecutor::execute() {
template <typename U>
void TopNExecutor::executeTopN(Iterator *iter) {
auto uIter = static_cast<U *>(iter);
std::vector<Row> heap(uIter->begin(), uIter->begin() + heapSize_);
std::make_heap(heap.begin(), heap.end(), comparator_);
auto it = uIter->begin() + heapSize_;
auto it = uIter->begin();
TopKHeap<Row> topKHeap(heapSize_, comparator_);
while (it != uIter->end()) {
if (comparator_(*it, heap[0])) {
std::pop_heap(heap.begin(), heap.end(), comparator_);
heap.pop_back();
heap.push_back(*it);
std::push_heap(heap.begin(), heap.end(), comparator_);
}
++it;
topKHeap.push(*it);
it++;
}
std::sort_heap(heap.begin(), heap.end(), comparator_);
std::vector<Row> topK = topKHeap.moveTopK();
std::sort_heap(topK.begin(), topK.end(), comparator_);

auto beg = uIter->begin();
for (int i = 0; i < maxCount_; ++i) {
beg[i] = heap[offset_ + i];
beg[i] = topK[offset_ + i];
}
}

Expand Down
10 changes: 3 additions & 7 deletions src/graph/optimizer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,10 @@ nebula_add_library(
rule/IndexFullScanBaseRule.cpp
rule/TagIndexFullScanRule.cpp
rule/EdgeIndexFullScanRule.cpp
rule/PushLimitDownIndexScanRule.cpp
rule/PushLimitDownTagIndexFullScanRule.cpp
rule/PushLimitDownTagIndexPrefixScanRule.cpp
rule/PushLimitDownTagIndexRangeScanRule.cpp
rule/PushLimitDownEdgeIndexFullScanRule.cpp
rule/PushLimitDownEdgeIndexPrefixScanRule.cpp
rule/PushLimitDownEdgeIndexRangeScanRule.cpp
rule/PushLimitDownProjectRule.cpp
rule/PushLimitSortDownIndexScanRule.cpp
rule/PushTopNDownIndexRule.cpp
rule/PushLimitDownIndexRule.cpp
)

nebula_add_subdirectory(test)
1 change: 1 addition & 0 deletions src/graph/optimizer/OptimizerUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ void OptimizerUtils::copyIndexScanData(const nebula::graph::IndexScan* from,
to->setEmptyResultSet(from->isEmptyResultSet());
to->setSpace(from->space());
to->setReturnCols(from->returnColumns());
to->setOutPutColsToReturnCols(from->outPutColsToReturnCols());
to->setIsEdge(from->isEdge());
to->setSchemaId(from->schemaId());
to->setDedup(from->dedup());
Expand Down
80 changes: 0 additions & 80 deletions src/graph/optimizer/rule/PushLimitDownEdgeIndexFullScanRule.cpp

This file was deleted.

32 changes: 0 additions & 32 deletions src/graph/optimizer/rule/PushLimitDownEdgeIndexFullScanRule.h

This file was deleted.

Loading