Skip to content

Commit

Permalink
lookup support push down topK
Browse files Browse the repository at this point in the history
  • Loading branch information
MMyheart committed Oct 22, 2021
1 parent db6da98 commit 115434d
Show file tree
Hide file tree
Showing 48 changed files with 1,223 additions and 809 deletions.
7 changes: 6 additions & 1 deletion src/clients/storage/GraphStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ StorageRpcRespFuture<cpp2::LookupIndexResp> GraphStorageClient::lookupIndex(
bool isEdge,
int32_t tagOrEdge,
const std::vector<std::string>& returnCols,
const std::vector<cpp2::OrderBy>& orderBy,
int64_t limit) {
// TODO(sky) : instead of isEdge and tagOrEdge to nebula::cpp2::SchemaID for graph layer.
auto space = param.space;
Expand Down Expand Up @@ -516,9 +517,13 @@ StorageRpcRespFuture<cpp2::LookupIndexResp> GraphStorageClient::lookupIndex(
cpp2::IndexSpec spec;
spec.set_contexts(contexts);
spec.set_schema_id(schemaId);
if (!orderBy.empty()) {
spec.set_order_by(orderBy);
}
spec.set_limit(limit);

req.set_indices(spec);
req.set_common(common);
req.set_limit(limit);
}

return collectResponse(
Expand Down
3 changes: 2 additions & 1 deletion src/clients/storage/GraphStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ class GraphStorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsy
bool isEdge,
int32_t tagOrEdge,
const std::vector<std::string>& returnCols,
int64_t limit);
const std::vector<cpp2::OrderBy>& orderBy = std::vector<cpp2::OrderBy>(),
int64_t limit = std::numeric_limits<int64_t>::max());

StorageRpcRespFuture<cpp2::GetNeighborsResponse> lookupAndTraverse(
const CommonRequestParam& param, cpp2::IndexSpec indexSpec, cpp2::TraverseSpec traverseSpec);
Expand Down
71 changes: 71 additions & 0 deletions src/common/utils/TopKHeap.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef COMMON_UTILS_TOPKHEAP_H_
#define COMMON_UTILS_TOPKHEAP_H_

#include "common/base/Base.h"

namespace nebula {

template <class T>
class TopKHeap final {
public:
TopKHeap(int heapSize, std::function<bool(T, T)> comparator)
: heapSize_(heapSize), comparator_(std::move(comparator)) {
v_.reserve(heapSize);
}
~TopKHeap() = default;

void push(T data) {
if (v_.size() < static_cast<size_t>(heapSize_)) {
v_.push_back(data);
adjustUp(v_.size() - 1);
return;
}
if (comparator_(data, v_[0])) {
v_[0] = data;
adjustDown(0);
}
}

std::vector<T> moveTopK() { return std::move(v_); }

private:
void adjustDown(size_t parent) {
size_t child = parent * 2 + 1;
size_t size = v_.size();
while (child < size) {
if (child + 1 < size && comparator_(v_[child], v_[child + 1])) {
child += 1;
}
if (!comparator_(v_[parent], v_[child])) {
return;
}
std::swap(v_[parent], v_[child]);
parent = child;
child = parent * 2 + 1;
}
}
void adjustUp(size_t child) {
size_t parent = (child - 1) >> 1;
while (0 != child) {
if (!comparator_(v_[parent], v_[child])) {
return;
}
std::swap(v_[parent], v_[child]);
child = parent;
parent = (child - 1) >> 1;
}
}

private:
int heapSize_;
std::vector<T> v_;
std::function<bool(T, T)> comparator_;
};
} // namespace nebula
#endif // COMMON_UTILS_TOPKHEAP_H_
10 changes: 10 additions & 0 deletions src/common/utils/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,15 @@ nebula_add_test(
LIBRARIES
${THRIFT_LIBRARIES}
${PROXYGEN_LIBRARIES}
)

nebula_add_test(
NAME
topk_heap_test
SOURCES
TopKHeapTest.cpp
OBJECTS
$<TARGET_OBJECTS:base_obj>
LIBRARIES
gtest
)
50 changes: 50 additions & 0 deletions src/common/utils/test/TopKHeapTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include <gtest/gtest.h>

#include "common/utils/TopKHeap.h"

namespace nebula {

std::vector<int> getVector(std::function<bool(int, int)> comparator) {
TopKHeap<int> topKHeap(3, comparator);
topKHeap.push(3);
topKHeap.push(6);
topKHeap.push(1);
topKHeap.push(9);
topKHeap.push(2);
topKHeap.push(7);
return topKHeap.moveTopK();
}

TEST(TopKHeapTest, minHeapTest) {
auto topK = getVector([](const int& lhs, const int& rhs) { return lhs < rhs; });
std::stringstream ss;
for (auto& item : topK) {
ss << item << " ";
}
ASSERT_EQ(ss.str(), "3 2 1 ");
}

TEST(TopKHeapTest, maxHeapTest) {
auto topK = getVector([](const int& lhs, const int& rhs) { return lhs > rhs; });
std::stringstream ss;
for (auto& item : topK) {
ss << item << " ";
}
ASSERT_EQ(ss.str(), "6 7 9 ");
}

} // namespace nebula

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, true);
google::SetStderrLogging(google::INFO);

return RUN_ALL_TESTS();
}
1 change: 1 addition & 0 deletions src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ struct LookupContext final : public AstContext {
YieldColumns* yieldExpr{nullptr};
std::vector<std::string> idxReturnCols;
std::vector<std::string> idxColNames;
std::unordered_map<std::string, std::string> idxOutColsToReturnColsMap_;
// order by
};

Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/query/IndexScanExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ folly::Future<Status> IndexScanExecutor::indexScan() {
lookup->isEdge(),
lookup->schemaId(),
lookup->returnColumns(),
lookup->orderBy(),
lookup->limit())
.via(runner())
.thenValue([this](StorageRpcResponse<LookupIndexResp> &&rpcResp) {
Expand Down
22 changes: 9 additions & 13 deletions src/graph/executor/query/TopNExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "graph/executor/query/TopNExecutor.h"

#include "common/time/ScopedTimer.h"
#include "common/utils/TopKHeap.h"
#include "graph/planner/plan/Query.h"

namespace nebula {
Expand All @@ -20,7 +21,7 @@ folly::Future<Status> TopNExecutor::execute() {
if (UNLIKELY(iter == nullptr)) {
return Status::Error("Internal error: nullptr iterator in topn executor");
}
if (UNLIKELY(!result.iter()->isSequentialIter())) {
if (UNLIKELY(!result.iter()->isSequentialIter() && !result.iter()->isPropIter())) {
std::stringstream ss;
ss << "Internal error: Sort executor does not supported " << iter->kind();
LOG(ERROR) << ss.str();
Expand Down Expand Up @@ -71,23 +72,18 @@ folly::Future<Status> TopNExecutor::execute() {
template <typename U>
void TopNExecutor::executeTopN(Iterator *iter) {
auto uIter = static_cast<U *>(iter);
std::vector<Row> heap(uIter->begin(), uIter->begin() + heapSize_);
std::make_heap(heap.begin(), heap.end(), comparator_);
auto it = uIter->begin() + heapSize_;
auto it = uIter->begin();
TopKHeap<Row> topKHeap(heapSize_, comparator_);
while (it != uIter->end()) {
if (comparator_(*it, heap[0])) {
std::pop_heap(heap.begin(), heap.end(), comparator_);
heap.pop_back();
heap.push_back(*it);
std::push_heap(heap.begin(), heap.end(), comparator_);
}
++it;
topKHeap.push(*it);
it++;
}
std::sort_heap(heap.begin(), heap.end(), comparator_);
std::vector<Row> topK = topKHeap.moveTopK();
std::sort_heap(topK.begin(), topK.end(), comparator_);

auto beg = uIter->begin();
for (int i = 0; i < maxCount_; ++i) {
beg[i] = heap[offset_ + i];
beg[i] = topK[offset_ + i];
}
}

Expand Down
11 changes: 4 additions & 7 deletions src/graph/optimizer/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,11 @@ nebula_add_library(
rule/IndexFullScanBaseRule.cpp
rule/TagIndexFullScanRule.cpp
rule/EdgeIndexFullScanRule.cpp
rule/PushLimitDownIndexScanRule.cpp
rule/PushLimitDownTagIndexFullScanRule.cpp
rule/PushLimitDownTagIndexPrefixScanRule.cpp
rule/PushLimitDownTagIndexRangeScanRule.cpp
rule/PushLimitDownEdgeIndexFullScanRule.cpp
rule/PushLimitDownEdgeIndexPrefixScanRule.cpp
rule/PushLimitDownEdgeIndexRangeScanRule.cpp
rule/PushLimitDownProjectRule.cpp
rule/PushTopNDownProjectRule.cpp
rule/PushLimitSortDownIndexScanRule.cpp
rule/PushTopNDownIndexRule.cpp
rule/PushLimitDownIndexRule.cpp
)

nebula_add_subdirectory(test)
1 change: 1 addition & 0 deletions src/graph/optimizer/OptimizerUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ void OptimizerUtils::copyIndexScanData(const nebula::graph::IndexScan* from,
to->setEmptyResultSet(from->isEmptyResultSet());
to->setSpace(from->space());
to->setReturnCols(from->returnColumns());
to->setOutPutColsToReturnCols(from->outPutColsToReturnCols());
to->setIsEdge(from->isEdge());
to->setSchemaId(from->schemaId());
to->setDedup(from->dedup());
Expand Down
80 changes: 0 additions & 80 deletions src/graph/optimizer/rule/PushLimitDownEdgeIndexFullScanRule.cpp

This file was deleted.

32 changes: 0 additions & 32 deletions src/graph/optimizer/rule/PushLimitDownEdgeIndexFullScanRule.h

This file was deleted.

Loading

0 comments on commit 115434d

Please sign in to comment.