diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml index f2081951a4d..e12427f1ece 100644 --- a/.github/workflows/nightly.yml +++ b/.github/workflows/nightly.yml @@ -2,7 +2,7 @@ name: nightly on: schedule: - - cron: '0 18 * * *' + - cron: "0 18 * * *" concurrency: group: nightly @@ -35,7 +35,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-go@v2 with: - go-version: '^1.16.7' + go-version: "^1.16.7" - name: package run: ./package/package.sh -t Release -r ON -p OFF -s FALSE -k OFF - name: output some vars @@ -199,4 +199,99 @@ jobs: if: ${{ failure() }} with: name: ${{ matrix.os }}-${{ matrix.compiler }}-nebula-test-logs - path: ./build/server_*/logs/ + path: ./build/server_*/logs/ + + test: + name: Tck test + needs: package + runs-on: [self-hosted, nebula] + strategy: + fail-fast: false + matrix: + os: + - ubuntu1804 + - ubuntu2004 + - centos7 + - centos8 + extra_config: + - "ENABLE_SSL=true CA_SIGNED=true QUERY_CONCURRENTLY=false" + - "ENABLE_SSL=false CA_SIGNED=false QUERY_CONCURRENTLY=true" + env: + OSS_DIR: nebula-graph/package/nightly + container: + image: vesoft/nebula-dev:${{ matrix.os }} + steps: + - uses: webiny/action-post-run@2.0.1 + with: + run: sh -c "find . -mindepth 1 -delete" + - uses: actions/checkout@v2 + - name: output some vars + id: vars + env: + SHA_EXT: sha256sum.txt + run: | + subdir=$(date -u +%Y.%m.%d -d "1 days ago") + echo "::set-output name=subdir::$subdir" + + - id: oss_package + run: | + case ${{ matrix.os }} in + ubuntu1804) + echo "::set-output name=p::nebula-graph-${{ steps.vars.outputs.subdir }}-nightly.ubuntu1804.amd64.tar.gz" + ;; + ubuntu2004) + echo "::set-output name=p::nebula-graph-${{ steps.vars.outputs.subdir }}-nightly.ubuntu2004.amd64.tar.gz" + ;; + centos7) + echo "::set-output name=p::nebula-graph-${{ steps.vars.outputs.subdir }}-nightly.el7.x86_64.tar.gz" + ;; + centos8) + echo "::set-output name=p::nebula-graph-${{ steps.vars.outputs.subdir }}-nightly.el8.x86_64.tar.gz" + ;; + esac + - name: Prepare environment + id: prepare + run: | + [ -d build/ ] && rm -rf build/* || mkdir -p build + make init -C tests + - name: CMake + id: cmake + run: | + echo "::set-output name=j::8" + - name: download from oss + run: | + ossutil64 -e ${{ secrets.OSS_ENDPOINT }} \ + -i ${{ secrets.OSS_ID }} \ + -k ${{ secrets.OSS_SECRET }} \ + -f cp oss://${{ env.OSS_DIR }}/${{ steps.vars.outputs.subdir }}/${{ steps.oss_package.outputs.p }} \ + build/. + tar zxvf build/${{ steps.oss_package.outputs.p }} -C build + d=`echo ${{ steps.oss_package.outputs.p }} | sed 's/.tar.gz//'` + mv build/${d}/* build/. + + - name: Setup cluster + run: | + make CONTAINERIZED=true ${{ matrix.extra_config }} up + working-directory: tests/ + timeout-minutes: 2 + - name: Pytest + run: | + make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} test + working-directory: tests/ + timeout-minutes: 15 + - name: TCK + run: | + make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} tck + working-directory: tests/ + timeout-minutes: 60 + - name: Down cluster + run: | + make RM_DIR=false down + working-directory: tests/ + timeout-minutes: 2 + - name: Upload logs + uses: actions/upload-artifact@v2 + if: ${{ failure() }} + with: + name: ${{ matrix.os }}-${{ matrix.compiler }}-nebula-test-logs + path: ./build/server_*/logs*/ diff --git a/.github/workflows/rc.yml b/.github/workflows/rc.yml index 01a82d273ea..d76a5971e79 100644 --- a/.github/workflows/rc.yml +++ b/.github/workflows/rc.yml @@ -185,6 +185,9 @@ jobs: - ubuntu2004 - centos7 - centos8 + extra_config: + - "ENABLE_SSL=true CA_SIGNED=true QUERY_CONCURRENTLY=false" + - "ENABLE_SSL=false CA_SIGNED=false QUERY_CONCURRENTLY=true" env: OSS_DIR: nebula-graph/rc container: @@ -235,7 +238,7 @@ jobs: - name: Setup cluster run: | - make CONTAINERIZED=true ENABLE_SSL=true CA_SIGNED=true up + make CONTAINERIZED=true ${{ matrix.extra_config }} up working-directory: tests/ timeout-minutes: 2 - name: Pytest @@ -258,4 +261,4 @@ jobs: if: ${{ failure() }} with: name: ${{ matrix.os }}-${{ matrix.compiler }}-nebula-test-logs - path: ./build/server_*/logs/ + path: ./build/server_*/logs*/ diff --git a/src/common/expression/test/CMakeLists.txt b/src/common/expression/test/CMakeLists.txt index f0fc3cb1dfb..7c721741338 100644 --- a/src/common/expression/test/CMakeLists.txt +++ b/src/common/expression/test/CMakeLists.txt @@ -47,6 +47,7 @@ set(expression_test_common_libs $ $ $ + $ ) diff --git a/src/common/thread/GenericThreadPool.h b/src/common/thread/GenericThreadPool.h index 5f30aa207a1..f3a5696e3f8 100644 --- a/src/common/thread/GenericThreadPool.h +++ b/src/common/thread/GenericThreadPool.h @@ -113,6 +113,16 @@ class GenericThreadPool final : public boost::noncopyable, public nebula::cpp::N */ void purgeTimerTask(uint64_t id); + /** + * To add a repeated timer task for all workers which will be executed in each period. + * @ms interval in milliseconds + * @task a callable object + * @args variadic arguments + * @return ID of the added task, unique for this worker + */ + template + void addRepeatTaskForAll(size_t ms, F &&f, Args &&... args); + private: size_t nrThreads_{0}; std::atomic nextThread_{0}; @@ -156,6 +166,12 @@ uint64_t GenericThreadPool::addRepeatTask(size_t ms, F &&f, Args &&... args) { return ((idx << GenericWorker::TIMER_ID_BITS) | id); } +template +void GenericThreadPool::addRepeatTaskForAll(size_t ms, F &&f, Args &&... args) { + for (auto idx = 0UL; idx < nrThreads_; ++idx) { + pool_[idx]->addRepeatTask(ms, std::forward(f), std::forward(args)...); + } +} } // namespace thread } // namespace nebula diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index 9f4ba7f1a29..18fcc6f56ac 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -146,6 +146,7 @@ nebula_add_executable( $ $ $ + $ ${common_deps} LIBRARIES ${PROXYGEN_LIBRARIES} @@ -245,6 +246,7 @@ nebula_add_executable( $ $ $ + $ ${storage_meta_deps} ${common_deps} LIBRARIES diff --git a/src/graph/CMakeLists.txt b/src/graph/CMakeLists.txt index 83dc5c9f6e8..45a0bdbc33e 100644 --- a/src/graph/CMakeLists.txt +++ b/src/graph/CMakeLists.txt @@ -13,3 +13,4 @@ nebula_add_subdirectory(stats) nebula_add_subdirectory(util) nebula_add_subdirectory(validator) nebula_add_subdirectory(visitor) +nebula_add_subdirectory(gc) diff --git a/src/graph/context/CMakeLists.txt b/src/graph/context/CMakeLists.txt index 74d9cb784e0..e08caca7b59 100644 --- a/src/graph/context/CMakeLists.txt +++ b/src/graph/context/CMakeLists.txt @@ -1,7 +1,3 @@ -# Copyright (c) 2020 vesoft inc. All rights reserved. -# -# This source code is licensed under Apache 2.0 License. - nebula_add_library( graph_context_obj OBJECT QueryContext.cpp diff --git a/src/graph/context/ExecutionContext.cpp b/src/graph/context/ExecutionContext.cpp index f63db2ec90b..282bb24a8e7 100644 --- a/src/graph/context/ExecutionContext.cpp +++ b/src/graph/context/ExecutionContext.cpp @@ -5,6 +5,9 @@ #include "graph/context/ExecutionContext.h" +#include "graph/gc/GC.h" +#include "graph/service/GraphFlags.h" + namespace nebula { namespace graph { constexpr int64_t ExecutionContext::kLatestVersion; @@ -23,7 +26,12 @@ void ExecutionContext::setResult(const std::string& name, Result&& result) { } void ExecutionContext::dropResult(const std::string& name) { - valueMap_[name].clear(); + auto& val = valueMap_[name]; + if (FLAGS_enable_async_gc) { + GC::instance().clear(std::move(val)); + } else { + val.clear(); + } } size_t ExecutionContext::numVersions(const std::string& name) const { diff --git a/src/graph/context/Iterator.cpp b/src/graph/context/Iterator.cpp index 472e6235e02..80f4f7d1845 100644 --- a/src/graph/context/Iterator.cpp +++ b/src/graph/context/Iterator.cpp @@ -602,6 +602,15 @@ void GetNeighborsIter::clearEdges() { } } +SequentialIter::SequentialIter(const SequentialIter& iter) + : Iterator(iter.valuePtr(), Kind::kSequential) { + auto valuePtr = iter.valuePtr(); + auto& ds = valuePtr->mutableDataSet(); + iter_ = ds.rows.begin(); + rows_ = &ds.rows; + colIndices_ = iter.getColIndices(); +} + SequentialIter::SequentialIter(std::shared_ptr value, bool checkMemory) : Iterator(value, Kind::kSequential, checkMemory) { DCHECK(value->isDataSet()); @@ -715,6 +724,11 @@ Value SequentialIter::getEdge() const { return getColumn("EDGE"); } +PropIter::PropIter(const PropIter& iter) : SequentialIter(iter) { + dsIndex_ = iter.dsIndex_; + kind_ = Kind::kProp; +} + PropIter::PropIter(std::shared_ptr value, bool checkMemory) : SequentialIter(value, checkMemory) { DCHECK(value->isDataSet()); diff --git a/src/graph/context/Iterator.h b/src/graph/context/Iterator.h index e01cacba19e..ba67f79ba2a 100644 --- a/src/graph/context/Iterator.h +++ b/src/graph/context/Iterator.h @@ -422,10 +422,10 @@ class GetNeighborsIter final : public Iterator { class SequentialIter : public Iterator { public: explicit SequentialIter(std::shared_ptr value, bool checkMemory = false); + explicit SequentialIter(const SequentialIter& iter); // Union multiple sequential iterators explicit SequentialIter(std::vector> inputList); - // Union two sequential iterators. SequentialIter(std::unique_ptr left, std::unique_ptr right); @@ -527,6 +527,7 @@ class SequentialIter : public Iterator { class PropIter final : public SequentialIter { public: explicit PropIter(std::shared_ptr value, bool checkMemory = false); + explicit PropIter(const PropIter& iter); std::unique_ptr copy() const override { auto copy = std::make_unique(*this); diff --git a/src/graph/context/test/CMakeLists.txt b/src/graph/context/test/CMakeLists.txt index d29855fd21e..47680624730 100644 --- a/src/graph/context/test/CMakeLists.txt +++ b/src/graph/context/test/CMakeLists.txt @@ -47,6 +47,7 @@ SET(CONTEXT_TEST_LIBS $ $ $ + $ ) if(ENABLE_STANDALONE_VERSION) diff --git a/src/graph/executor/Executor.cpp b/src/graph/executor/Executor.cpp index 292fd0842f2..70290e4b5b4 100644 --- a/src/graph/executor/Executor.cpp +++ b/src/graph/executor/Executor.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include "common/base/ObjectPool.h" @@ -665,7 +666,6 @@ void Executor::drop(const PlanNode *node) { // Make sure drop happened-after count decrement CHECK_EQ(inputVar->userCount.load(std::memory_order_acquire), 0); ectx_->dropResult(inputVar->name); - VLOG(1) << node->kind() << " Drop variable " << inputVar->name; } } } @@ -734,5 +734,14 @@ folly::Executor *Executor::runner() const { return qctx()->rctx()->runner(); } +size_t Executor::getBatchSize(size_t totalSize) const { + // batch size should be the greater one of FLAGS_min_batch_size and (totalSize/FLAGS_max_job_size) + size_t jobSize = FLAGS_max_job_size; + size_t minBatchSize = FLAGS_min_batch_size; + size_t batchSizeTmp = std::ceil(static_cast(totalSize) / jobSize); + size_t batchSize = batchSizeTmp > minBatchSize ? batchSizeTmp : minBatchSize; + return batchSize; +} + } // namespace graph } // namespace nebula diff --git a/src/graph/executor/Executor.h b/src/graph/executor/Executor.h index 04c71db40b6..db6b54df3d0 100644 --- a/src/graph/executor/Executor.h +++ b/src/graph/executor/Executor.h @@ -106,6 +106,17 @@ class Executor : private boost::noncopyable, private cpp::NonMovable { // Store the default result which not used for later executor Status finish(Value &&value); + size_t getBatchSize(size_t totalSize) const; + + // ScatterFunc: A callback function that handle partial records of a dataset. + // GatherFunc: A callback function that gather all results of ScatterFunc, and do post works. + // Iterator: An iterator of a dataset. + template < + class ScatterFunc, + class ScatterResult = typename std::result_of::type, + class GatherFunc> + auto runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator *iter); + int64_t id_; // Executor name @@ -129,6 +140,34 @@ class Executor : private boost::noncopyable, private cpp::NonMovable { std::unordered_map otherStats_; }; +template +auto Executor::runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator *iter) { + size_t totalSize = iter->size(); + size_t batchSize = getBatchSize(totalSize); + + // Start multiple jobs for handling the results + std::vector> futures; + size_t begin = 0, end = 0, dispathedCnt = 0; + while (dispathedCnt < totalSize) { + end = begin + batchSize > totalSize ? totalSize : begin + batchSize; + futures.emplace_back(folly::via( + runner(), + [begin, end, tmpIter = iter->copy(), f = std::move(scatter)]() mutable -> ScatterResult { + // Since not all iterators are linear, so iterates to the begin pos + size_t tmp = 0; + for (; tmpIter->valid() && tmp < begin; ++tmp) { + tmpIter->next(); + } + + return f(begin, end, tmpIter.get()); + })); + begin = end; + dispathedCnt += batchSize; + } + + // Gather all results and do post works + return folly::collect(futures).via(runner()).thenValue(std::move(gather)); +} } // namespace graph } // namespace nebula diff --git a/src/graph/executor/algo/ShortestPathBase.cpp b/src/graph/executor/algo/ShortestPathBase.cpp index ea4b7645eff..f1af092e34c 100644 --- a/src/graph/executor/algo/ShortestPathBase.cpp +++ b/src/graph/executor/algo/ShortestPathBase.cpp @@ -218,7 +218,9 @@ void ShortestPathBase::addStats(PropRpcResponse& resp, int64_t timeInUSec) const ss << "\n}"; } ss << "\n}"; + statsLock_.lock(); stats_->emplace(folly::sformat("get_prop "), ss.str()); + statsLock_.unlock(); } } // namespace graph diff --git a/src/graph/executor/query/AppendVerticesExecutor.cpp b/src/graph/executor/query/AppendVerticesExecutor.cpp index d30c82eabe7..d64a470cbe8 100644 --- a/src/graph/executor/query/AppendVerticesExecutor.cpp +++ b/src/graph/executor/query/AppendVerticesExecutor.cpp @@ -4,6 +4,8 @@ #include "graph/executor/query/AppendVerticesExecutor.h" +#include + using nebula::storage::StorageClient; using nebula::storage::StorageRpcResponse; using nebula::storage::cpp2::GetPropResponse; @@ -37,6 +39,7 @@ folly::Future AppendVerticesExecutor::appendVertices() { qctx()->rctx()->session()->id(), qctx()->plan()->id(), qctx()->plan()->isProfileEnabled()); + time::Duration getPropsTime; return DCHECK_NOTNULL(storageClient) ->getProps(param, @@ -56,7 +59,11 @@ folly::Future AppendVerticesExecutor::appendVertices() { .thenValue([this](StorageRpcResponse &&rpcResp) { SCOPED_TIMER(&execTime_); addStats(rpcResp, otherStats_); - return handleResp(std::move(rpcResp)); + if (FLAGS_max_job_size <= 1) { + return folly::makeFuture(handleResp(std::move(rpcResp))); + } else { + return handleRespMultiJobs(std::move(rpcResp)); + } }); } @@ -114,5 +121,133 @@ Status AppendVerticesExecutor::handleResp( return finish(ResultBuilder().value(Value(std::move(ds))).state(state).build()); } +folly::Future AppendVerticesExecutor::handleRespMultiJobs( + storage::StorageRpcResponse &&rpcResp) { + auto result = handleCompleteness(rpcResp, FLAGS_accept_partial_success); + NG_RETURN_IF_ERROR(result); + auto *av = asNode(node()); + + auto inputIter = qctx()->ectx()->getResult(av->inputVar()).iter(); + result_.colNames = av->colNames(); + result_.rows.reserve(inputIter->size()); + + nebula::DataSet v; + for (auto &resp : rpcResp.responses()) { + if (resp.props_ref().has_value()) { + auto &&respV = std::move(*resp.props_ref()); + v.colNames = respV.colNames; + v.rows.insert(v.rows.end(), + std::make_move_iterator(respV.begin()), + std::make_move_iterator(respV.end())); + } + } + auto propIter = PropIter(std::make_shared(std::move(v))); + + if (!av->trackPrevPath()) { + auto scatter = [this]( + size_t begin, size_t end, Iterator *tmpIter) mutable -> StatusOr { + return buildVerticesResult(begin, end, tmpIter); + }; + + auto gather = [this](auto &&results) -> Status { + for (auto &r : results) { + auto &&rows = std::move(r).value(); + result_.rows.insert(result_.rows.end(), + std::make_move_iterator(rows.begin()), + std::make_move_iterator(rows.end())); + } + return finish(ResultBuilder().value(Value(std::move(result_))).build()); + }; + + return runMultiJobs(std::move(scatter), std::move(gather), &propIter); + } else { + auto scatter = [this](size_t begin, size_t end, Iterator *tmpIter) mutable -> folly::Unit { + buildMap(begin, end, tmpIter); + return folly::unit; + }; + + auto gather = + [this, inputIterNew = std::move(inputIter)](auto &&prepareResult) -> folly::Future { + UNUSED(prepareResult); + + auto scatterInput = + [this](size_t begin, size_t end, Iterator *tmpIter) mutable -> StatusOr { + return handleJob(begin, end, tmpIter); + }; + + auto gatherFinal = [this](auto &&results) -> Status { + for (auto &r : results) { + auto &&rows = std::move(r).value(); + result_.rows.insert(result_.rows.end(), + std::make_move_iterator(rows.begin()), + std::make_move_iterator(rows.end())); + } + return finish(ResultBuilder().value(Value(std::move(result_))).build()); + }; + + return runMultiJobs(std::move(scatterInput), std::move(gatherFinal), inputIterNew.get()); + }; + + return runMultiJobs(std::move(scatter), std::move(gather), &propIter); + } +} + +DataSet AppendVerticesExecutor::buildVerticesResult(size_t begin, size_t end, Iterator *iter) { + auto *av = asNode(node()); + auto vFilter = av->vFilter() ? av->vFilter()->clone() : nullptr; + DataSet ds; + ds.colNames = av->colNames(); + ds.rows.reserve(end - begin); + QueryExpressionContext ctx(qctx()->ectx()); + for (; iter->valid() && begin++ < end; iter->next()) { + if (vFilter != nullptr) { + auto &vFilterVal = vFilter->eval(ctx(iter)); + if (!vFilterVal.isBool() || !vFilterVal.getBool()) { + continue; + } + } + Row row; + row.values.emplace_back(iter->getVertex()); + ds.rows.emplace_back(std::move(row)); + } + + return ds; +} + +void AppendVerticesExecutor::buildMap(size_t begin, size_t end, Iterator *iter) { + auto *av = asNode(node()); + auto vFilter = av->vFilter() ? av->vFilter()->clone() : nullptr; + QueryExpressionContext ctx(qctx()->ectx()); + for (; iter->valid() && begin++ < end; iter->next()) { + if (vFilter != nullptr) { + auto &vFilterVal = vFilter->eval(ctx(iter)); + if (!vFilterVal.isBool() || !vFilterVal.getBool()) { + continue; + } + } + dsts_.emplace(iter->getColumn(kVid), iter->getVertex()); + } +} + +DataSet AppendVerticesExecutor::handleJob(size_t begin, size_t end, Iterator *iter) { + auto *av = asNode(node()); + DataSet ds; + ds.colNames = av->colNames(); + ds.rows.reserve(end - begin); + auto src = av->src()->clone(); + QueryExpressionContext ctx(qctx()->ectx()); + for (; iter->valid() && begin++ < end; iter->next()) { + auto dstFound = dsts_.find(src->eval(ctx(iter))); + if (dstFound == dsts_.end()) { + continue; + } + Row row = *iter->row(); + row.values.emplace_back(dstFound->second); + ds.rows.emplace_back(std::move(row)); + } + + return ds; +} + } // namespace graph } // namespace nebula diff --git a/src/graph/executor/query/AppendVerticesExecutor.h b/src/graph/executor/query/AppendVerticesExecutor.h index b95cbd7671f..bbe3d697d54 100644 --- a/src/graph/executor/query/AppendVerticesExecutor.h +++ b/src/graph/executor/query/AppendVerticesExecutor.h @@ -27,6 +27,20 @@ class AppendVerticesExecutor final : public GetPropExecutor { folly::Future appendVertices(); Status handleResp(storage::StorageRpcResponse &&rpcResp); + + folly::Future handleRespMultiJobs( + storage::StorageRpcResponse &&rpcResp); + + DataSet handleJob(size_t begin, size_t end, Iterator *iter); + + DataSet buildVerticesResult(size_t begin, size_t end, Iterator *iter); + + void buildMap(size_t begin, size_t end, Iterator *iter); + + // dsts_ and result_ are used for handling the response by multi jobs + // DstId -> Vertex + folly::ConcurrentHashMap dsts_; + DataSet result_; }; } // namespace graph diff --git a/src/graph/executor/query/FilterExecutor.cpp b/src/graph/executor/query/FilterExecutor.cpp index 4c9482a7061..b824472a4a2 100644 --- a/src/graph/executor/query/FilterExecutor.cpp +++ b/src/graph/executor/query/FilterExecutor.cpp @@ -5,21 +5,82 @@ #include "graph/executor/query/FilterExecutor.h" #include "graph/planner/plan/Query.h" +#include "graph/service/GraphFlags.h" namespace nebula { namespace graph { folly::Future FilterExecutor::execute() { SCOPED_TIMER(&execTime_); - auto* filter = asNode(node()); - Result result = ectx_->getResult(filter->inputVar()); - auto* iter = result.iterRef(); + auto *filter = asNode(node()); + auto iter = ectx_->getResult(filter->inputVar()).iter(); if (iter == nullptr || iter->isDefaultIter()) { auto status = Status::Error("iterator is nullptr or DefaultIter"); LOG(ERROR) << status; return status; } + if (FLAGS_max_job_size == 1 || iter->isGetNeighborsIter()) { + // TODO :GetNeighborsIterator is not an thread safe implementation. + return handleSingleJobFilter(); + } else { + DataSet ds; + ds.colNames = iter->valuePtr()->getDataSet().colNames; + ds.rows.reserve(iter->size()); + auto scatter = [this]( + size_t begin, size_t end, Iterator *tmpIter) mutable -> StatusOr { + return handleJob(begin, end, tmpIter); + }; + + auto gather = + [this, result = std::move(ds), kind = iter->kind()](auto &&results) mutable -> Status { + for (auto &r : results) { + if (!r.ok()) { + return r.status(); + } + auto &&rows = std::move(r).value(); + result.rows.insert(result.rows.end(), + std::make_move_iterator(rows.begin()), + std::make_move_iterator(rows.end())); + } + if (kind == Iterator::Kind::kSequential) { + finish(ResultBuilder().value(Value(std::move(result))).build()); + } else if (kind == Iterator::Kind::kProp) { + finish(ResultBuilder().value(Value(std::move(result))).iter(Iterator::Kind::kProp).build()); + } else { + LOG(ERROR) << "Default and Getneigbors Iter not support multi job filter."; + } + return Status::OK(); + }; + + return runMultiJobs(std::move(scatter), std::move(gather), iter.get()); + } +} + +StatusOr FilterExecutor::handleJob(size_t begin, size_t end, Iterator *iter) { + auto *filter = asNode(node()); + QueryExpressionContext ctx(ectx_); + auto condition = filter->condition()->clone(); + DataSet ds; + for (; iter->valid() && begin++ < end; iter->next()) { + auto val = condition->eval(ctx(iter)); + if (val.isBadNull() || (!val.empty() && !val.isImplicitBool() && !val.isNull())) { + return Status::Error("Wrong type result, the type should be NULL, EMPTY, BOOL"); + } + if (!(val.empty() || val.isNull() || (val.isImplicitBool() && !val.implicitBool()))) { + // TODO: Maybe we can move. + auto row = *iter->row(); + ds.rows.emplace_back(std::move(row)); + } + } + return ds; +} + +Status FilterExecutor::handleSingleJobFilter() { + auto *filter = asNode(node()); + Result result = ectx_->getResult(filter->inputVar()); + auto *iter = result.iterRef(); + ResultBuilder builder; builder.value(result.valuePtr()); QueryExpressionContext ctx(ectx_); diff --git a/src/graph/executor/query/FilterExecutor.h b/src/graph/executor/query/FilterExecutor.h index 9772e07d5aa..2af33183587 100644 --- a/src/graph/executor/query/FilterExecutor.h +++ b/src/graph/executor/query/FilterExecutor.h @@ -18,6 +18,10 @@ class FilterExecutor final : public Executor { : Executor("FilterExecutor", node, qctx) {} folly::Future execute() override; + + StatusOr handleJob(size_t begin, size_t end, Iterator *iter); + + Status handleSingleJobFilter(); }; } // namespace graph diff --git a/src/graph/executor/query/InnerJoinExecutor.cpp b/src/graph/executor/query/InnerJoinExecutor.cpp index f04ea6877e9..ee835b20478 100644 --- a/src/graph/executor/query/InnerJoinExecutor.cpp +++ b/src/graph/executor/query/InnerJoinExecutor.cpp @@ -5,6 +5,7 @@ #include "graph/executor/query/InnerJoinExecutor.h" #include "graph/planner/plan/Query.h" +#include "graph/service/GraphFlags.h" namespace nebula { namespace graph { @@ -12,7 +13,11 @@ folly::Future InnerJoinExecutor::execute() { SCOPED_TIMER(&execTime_); auto* joinNode = asNode(node()); NG_RETURN_IF_ERROR(checkInputDataSets()); - return join(joinNode->hashKeys(), joinNode->probeKeys(), joinNode->colNames()); + if (FLAGS_max_job_size <= 1) { + return join(joinNode->hashKeys(), joinNode->probeKeys(), joinNode->colNames()); + } else { + return joinMultiJobs(joinNode->hashKeys(), joinNode->probeKeys(), joinNode->colNames()); + } } Status InnerJoinExecutor::close() { @@ -108,6 +113,113 @@ DataSet InnerJoinExecutor::singleKeyProbe( return ds; } +folly::Future InnerJoinExecutor::joinMultiJobs(const std::vector& hashKeys, + const std::vector& probeKeys, + const std::vector& colNames) { + auto bucketSize = lhsIter_->size() > rhsIter_->size() ? rhsIter_->size() : lhsIter_->size(); + + DCHECK_EQ(hashKeys.size(), probeKeys.size()); + + if (lhsIter_->empty() || rhsIter_->empty()) { + DataSet result; + result.colNames = colNames; + return finish(ResultBuilder().value(Value(std::move(result))).build()); + } + + if (hashKeys.size() == 1 && probeKeys.size() == 1) { + hashTable_.reserve(bucketSize); + if (lhsIter_->size() < rhsIter_->size()) { + buildSingleKeyHashTable(hashKeys.front(), lhsIter_.get(), hashTable_); + return singleKeyProbe(probeKeys.front(), rhsIter_.get()); + } else { + exchange_ = true; + buildSingleKeyHashTable(probeKeys.front(), rhsIter_.get(), hashTable_); + return singleKeyProbe(hashKeys.front(), lhsIter_.get()); + } + } else { + listHashTable_.reserve(bucketSize); + if (lhsIter_->size() < rhsIter_->size()) { + buildHashTable(hashKeys, lhsIter_.get(), listHashTable_); + return probe(probeKeys, rhsIter_.get()); + } else { + exchange_ = true; + buildHashTable(probeKeys, rhsIter_.get(), listHashTable_); + return probe(hashKeys, lhsIter_.get()); + } + } +} + +folly::Future InnerJoinExecutor::probe(const std::vector& probeKeys, + Iterator* probeIter) { + std::vector tmpProbeKeys; + std::for_each(probeKeys.begin(), probeKeys.end(), [&tmpProbeKeys](auto& e) { + tmpProbeKeys.emplace_back(e->clone()); + }); + auto scatter = [this, tmpProbeKeys = std::move(tmpProbeKeys)]( + size_t begin, size_t end, Iterator* tmpIter) -> StatusOr { + DataSet ds; + QueryExpressionContext ctx(ectx_); + ds.rows.reserve(end - begin); + for (; tmpIter->valid() && begin++ < end; tmpIter->next()) { + List list; + list.values.reserve(tmpProbeKeys.size()); + for (auto& col : tmpProbeKeys) { + Value val = col->eval(ctx(tmpIter)); + list.values.emplace_back(std::move(val)); + } + buildNewRow(listHashTable_, list, *tmpIter->row(), ds); + } + return ds; + }; + + auto gather = [this](auto&& results) mutable -> Status { + DataSet result; + auto* joinNode = asNode(node()); + result.colNames = joinNode->colNames(); + for (auto& r : results) { + auto&& rows = std::move(r).value(); + result.rows.insert(result.rows.end(), + std::make_move_iterator(rows.begin()), + std::make_move_iterator(rows.end())); + } + finish(ResultBuilder().value(Value(std::move(result))).build()); + return Status::OK(); + }; + + return runMultiJobs(std::move(scatter), std::move(gather), probeIter); +} + +folly::Future InnerJoinExecutor::singleKeyProbe(Expression* probeKey, Iterator* probeIter) { + auto scatter = [this, probeKey]( + size_t begin, size_t end, Iterator* tmpIter) -> StatusOr { + auto tmpProbeKey = probeKey->clone(); + DataSet ds; + QueryExpressionContext ctx(ectx_); + ds.rows.reserve(end - begin); + for (; tmpIter->valid() && begin++ < end; tmpIter->next()) { + auto& val = tmpProbeKey->eval(ctx(tmpIter)); + buildNewRow(hashTable_, val, *tmpIter->row(), ds); + } + return ds; + }; + + auto gather = [this](auto&& results) mutable -> Status { + DataSet result; + auto* joinNode = asNode(node()); + result.colNames = joinNode->colNames(); + for (auto& r : results) { + auto&& rows = std::move(r).value(); + result.rows.insert(result.rows.end(), + std::make_move_iterator(rows.begin()), + std::make_move_iterator(rows.end())); + } + finish(ResultBuilder().value(Value(std::move(result))).build()); + return Status::OK(); + }; + + return runMultiJobs(std::move(scatter), std::move(gather), probeIter); +} + template void InnerJoinExecutor::buildNewRow(const std::unordered_map>& hashTable, const T& val, diff --git a/src/graph/executor/query/InnerJoinExecutor.h b/src/graph/executor/query/InnerJoinExecutor.h index a5cae724321..f7deb1162f6 100644 --- a/src/graph/executor/query/InnerJoinExecutor.h +++ b/src/graph/executor/query/InnerJoinExecutor.h @@ -19,6 +19,7 @@ class InnerJoinExecutor : public JoinExecutor { Status close() override; protected: + // join/probe/singleKeyProbe implemented for single job. folly::Future join(const std::vector& hashKeys, const std::vector& probeKeys, const std::vector& colNames); @@ -31,6 +32,16 @@ class InnerJoinExecutor : public JoinExecutor { Iterator* probeIter, const std::unordered_map>& hashTable) const; + // joinMultiJobs/probe/singleKeyProbe implemented for multi jobs. + // For now, the InnerJoin implementation only implement the parallel processing on probe side. + folly::Future joinMultiJobs(const std::vector& hashKeys, + const std::vector& probeKeys, + const std::vector& colNames); + + folly::Future probe(const std::vector& probeKeys, Iterator* probeIter); + + folly::Future singleKeyProbe(Expression* probeKey, Iterator* probeIter); + template void buildNewRow(const std::unordered_map>& hashTable, const T& val, diff --git a/src/graph/executor/query/JoinExecutor.cpp b/src/graph/executor/query/JoinExecutor.cpp index 2336425f7e3..b35d03cae8e 100644 --- a/src/graph/executor/query/JoinExecutor.cpp +++ b/src/graph/executor/query/JoinExecutor.cpp @@ -10,6 +10,9 @@ namespace nebula { namespace graph { Status JoinExecutor::checkInputDataSets() { + // Since the executors might reuse in loops, so manually clear the table here. + hashTable_.clear(); + listHashTable_.clear(); auto* join = asNode(node()); lhsIter_ = ectx_->getVersionedResult(join->leftVar().first, join->leftVar().second).iter(); DCHECK(!!lhsIter_); @@ -49,10 +52,9 @@ Status JoinExecutor::checkBiInputDataSets() { return Status::OK(); } -void JoinExecutor::buildHashTable( - const std::vector& hashKeys, - Iterator* iter, - std::unordered_map>& hashTable) const { +void JoinExecutor::buildHashTable(const std::vector& hashKeys, + Iterator* iter, + std::unordered_map>& hashTable) { QueryExpressionContext ctx(ectx_); for (; iter->valid(); iter->next()) { List list; @@ -70,7 +72,7 @@ void JoinExecutor::buildHashTable( void JoinExecutor::buildSingleKeyHashTable( Expression* hashKey, Iterator* iter, - std::unordered_map>& hashTable) const { + std::unordered_map>& hashTable) { QueryExpressionContext ctx(ectx_); for (; iter->valid(); iter->next()) { auto& val = hashKey->eval(ctx(iter)); diff --git a/src/graph/executor/query/JoinExecutor.h b/src/graph/executor/query/JoinExecutor.h index 63f09374b20..e759565b649 100644 --- a/src/graph/executor/query/JoinExecutor.h +++ b/src/graph/executor/query/JoinExecutor.h @@ -18,17 +18,15 @@ class JoinExecutor : public Executor { protected: Status checkInputDataSets(); - void buildHashTable(const std::vector& hashKeys, Iterator* iter); - Status checkBiInputDataSets(); void buildHashTable(const std::vector& hashKeys, Iterator* iter, - std::unordered_map>& hashTable) const; + std::unordered_map>& hashTable); void buildSingleKeyHashTable(Expression* hashKey, Iterator* iter, - std::unordered_map>& hashTable) const; + std::unordered_map>& hashTable); // concat rows Row newRow(Row left, Row right) const; @@ -36,6 +34,8 @@ class JoinExecutor : public Executor { std::unique_ptr lhsIter_; std::unique_ptr rhsIter_; size_t colSize_{0}; + std::unordered_map> hashTable_; + std::unordered_map> listHashTable_; }; } // namespace graph } // namespace nebula diff --git a/src/graph/executor/query/LeftJoinExecutor.cpp b/src/graph/executor/query/LeftJoinExecutor.cpp index 24d7ed513af..855290bab7d 100644 --- a/src/graph/executor/query/LeftJoinExecutor.cpp +++ b/src/graph/executor/query/LeftJoinExecutor.cpp @@ -4,7 +4,10 @@ #include "graph/executor/query/LeftJoinExecutor.h" +#include + #include "graph/planner/plan/Query.h" +#include "graph/service/GraphFlags.h" namespace nebula { namespace graph { @@ -15,7 +18,11 @@ folly::Future LeftJoinExecutor::execute() { ectx_->getVersionedResult(joinNode->rightVar().first, joinNode->rightVar().second); rightColSize_ = rhsResult.valuePtr()->getDataSet().colNames.size(); NG_RETURN_IF_ERROR(checkInputDataSets()); - return join(joinNode->hashKeys(), joinNode->probeKeys(), joinNode->colNames()); + if (FLAGS_max_job_size <= 1) { + return join(joinNode->hashKeys(), joinNode->probeKeys(), joinNode->colNames()); + } else { + return joinMultiJobs(joinNode->hashKeys(), joinNode->probeKeys(), joinNode->colNames()); + } } Status LeftJoinExecutor::close() { @@ -95,6 +102,100 @@ DataSet LeftJoinExecutor::singleKeyProbe( return ds; } +folly::Future LeftJoinExecutor::joinMultiJobs(const std::vector& hashKeys, + const std::vector& probeKeys, + const std::vector& colNames) { + DCHECK_EQ(hashKeys.size(), probeKeys.size()); + DataSet result; + if (hashKeys.size() == 1 && probeKeys.size() == 1) { + hashTable_.reserve(rhsIter_->empty() ? 1 : rhsIter_->size()); + if (!lhsIter_->empty()) { + buildSingleKeyHashTable(probeKeys.front(), rhsIter_.get(), hashTable_); + return singleKeyProbe(hashKeys.front(), lhsIter_.get()); + } + } else { + listHashTable_.reserve(rhsIter_->empty() ? 1 : rhsIter_->size()); + if (!lhsIter_->empty()) { + buildHashTable(probeKeys, rhsIter_.get(), listHashTable_); + return probe(hashKeys, lhsIter_.get()); + } + } + + result.colNames = colNames; + return finish(ResultBuilder().value(Value(std::move(result))).build()); +} + +folly::Future LeftJoinExecutor::probe(const std::vector& probeKeys, + Iterator* probeIter) { + auto scatter = [this, probeKeys = probeKeys]( + size_t begin, size_t end, Iterator* tmpIter) -> StatusOr { + std::vector tmpProbeKeys; + std::for_each(probeKeys.begin(), probeKeys.end(), [&tmpProbeKeys](auto& e) { + tmpProbeKeys.emplace_back(e->clone()); + }); + DataSet ds; + QueryExpressionContext ctx(ectx_); + ds.rows.reserve(end - begin); + for (; tmpIter->valid() && begin++ < end; tmpIter->next()) { + List list; + list.values.reserve(tmpProbeKeys.size()); + for (auto& col : tmpProbeKeys) { + Value val = col->eval(ctx(tmpIter)); + list.values.emplace_back(std::move(val)); + } + + buildNewRow(listHashTable_, list, *tmpIter->row(), ds); + } + return ds; + }; + + auto gather = [this](auto&& results) mutable -> Status { + DataSet result; + auto* joinNode = asNode(node()); + result.colNames = joinNode->colNames(); + for (auto& r : results) { + auto&& rows = std::move(r).value(); + result.rows.insert(result.rows.end(), + std::make_move_iterator(rows.begin()), + std::make_move_iterator(rows.end())); + } + return finish(ResultBuilder().value(Value(std::move(result))).build()); + }; + + return runMultiJobs(std::move(scatter), std::move(gather), probeIter); +} + +folly::Future LeftJoinExecutor::singleKeyProbe(Expression* probeKey, Iterator* probeIter) { + auto scatter = [this, probeKey]( + size_t begin, size_t end, Iterator* tmpIter) -> StatusOr { + auto tmpProbeKey = probeKey->clone(); + DataSet ds; + QueryExpressionContext ctx(ectx_); + ds.rows.reserve(end - begin); + for (; tmpIter->valid() && begin++ < end; tmpIter->next()) { + auto& val = tmpProbeKey->eval(ctx(tmpIter)); + buildNewRow(hashTable_, val, *tmpIter->row(), ds); + } + return ds; + }; + + auto gather = [this](auto&& results) mutable -> Status { + DataSet result; + auto* joinNode = asNode(node()); + result.colNames = joinNode->colNames(); + for (auto& r : results) { + auto&& rows = std::move(r).value(); + result.rows.insert(result.rows.end(), + std::make_move_iterator(rows.begin()), + std::make_move_iterator(rows.end())); + } + finish(ResultBuilder().value(Value(std::move(result))).build()); + return Status::OK(); + }; + + return runMultiJobs(std::move(scatter), std::move(gather), probeIter); +} + template void LeftJoinExecutor::buildNewRow(const std::unordered_map>& hashTable, const T& val, @@ -124,6 +225,7 @@ BiLeftJoinExecutor::BiLeftJoinExecutor(const PlanNode* node, QueryContext* qctx) : LeftJoinExecutor(node, qctx) { name_ = "BiLeftJoinExecutor"; } + folly::Future BiLeftJoinExecutor::execute() { SCOPED_TIMER(&execTime_); auto* joinNode = asNode(node()); diff --git a/src/graph/executor/query/LeftJoinExecutor.h b/src/graph/executor/query/LeftJoinExecutor.h index 0ccb49abd87..4c39b6d58db 100644 --- a/src/graph/executor/query/LeftJoinExecutor.h +++ b/src/graph/executor/query/LeftJoinExecutor.h @@ -20,6 +20,7 @@ class LeftJoinExecutor : public JoinExecutor { Status close() override; protected: + // join/probe/singleKeyProbe implemented for single job. folly::Future join(const std::vector& hashKeys, const std::vector& probeKeys, const std::vector& colNames); @@ -32,6 +33,16 @@ class LeftJoinExecutor : public JoinExecutor { Iterator* probeIter, const std::unordered_map>& hashTable) const; + // joinMultiJobs/probe/singleKeyProbe implemented for multi jobs. + // For now, the InnerJoin implementation only implement the parallel processing on probe side. + folly::Future joinMultiJobs(const std::vector& hashKeys, + const std::vector& probeKeys, + const std::vector& colNames); + + folly::Future probe(const std::vector& probeKeys, Iterator* probeIter); + + folly::Future singleKeyProbe(Expression* probeKey, Iterator* probeIter); + template void buildNewRow(const std::unordered_map>& hashTable, const T& val, diff --git a/src/graph/executor/query/ProjectExecutor.cpp b/src/graph/executor/query/ProjectExecutor.cpp index 91615f18937..64db18cede0 100644 --- a/src/graph/executor/query/ProjectExecutor.cpp +++ b/src/graph/executor/query/ProjectExecutor.cpp @@ -5,30 +5,61 @@ #include "graph/executor/query/ProjectExecutor.h" #include "graph/planner/plan/Query.h" +#include "graph/service/GraphFlags.h" namespace nebula { namespace graph { folly::Future ProjectExecutor::execute() { SCOPED_TIMER(&execTime_); - auto* project = asNode(node()); - auto columns = project->columns()->columns(); + auto *project = asNode(node()); auto iter = ectx_->getResult(project->inputVar()).iter(); DCHECK(!!iter); QueryExpressionContext ctx(ectx_); + if (FLAGS_max_job_size <= 1) { + auto ds = handleJob(0, iter->size(), iter.get()); + return finish(ResultBuilder().value(Value(std::move(ds))).build()); + } else { + DataSet ds; + ds.colNames = project->colNames(); + ds.rows.reserve(iter->size()); + + auto scatter = [this](size_t begin, size_t end, Iterator *tmpIter) -> StatusOr { + return handleJob(begin, end, tmpIter); + }; + + auto gather = [this, result = std::move(ds)](auto &&results) mutable { + for (auto &r : results) { + auto &&rows = std::move(r).value(); + result.rows.insert(result.rows.end(), + std::make_move_iterator(rows.begin()), + std::make_move_iterator(rows.end())); + } + finish(ResultBuilder().value(Value(std::move(result))).build()); + return Status::OK(); + }; + + return runMultiJobs(std::move(scatter), std::move(gather), iter.get()); + } +} + +DataSet ProjectExecutor::handleJob(size_t begin, size_t end, Iterator *iter) { + auto *project = asNode(node()); + auto columns = project->columns()->clone(); DataSet ds; ds.colNames = project->colNames(); - ds.rows.reserve(iter->size()); - for (; iter->valid(); iter->next()) { + QueryExpressionContext ctx(qctx()->ectx()); + ds.rows.reserve(end - begin); + for (; iter->valid() && begin++ < end; iter->next()) { Row row; - for (auto& col : columns) { - Value val = col->expr()->eval(ctx(iter.get())); + for (auto &col : columns->columns()) { + Value val = col->expr()->eval(ctx(iter)); row.values.emplace_back(std::move(val)); } ds.rows.emplace_back(std::move(row)); } - return finish(ResultBuilder().value(Value(std::move(ds))).build()); + return ds; } } // namespace graph diff --git a/src/graph/executor/query/ProjectExecutor.h b/src/graph/executor/query/ProjectExecutor.h index 08b36070aa2..6d6cde386e8 100644 --- a/src/graph/executor/query/ProjectExecutor.h +++ b/src/graph/executor/query/ProjectExecutor.h @@ -16,6 +16,8 @@ class ProjectExecutor final : public Executor { : Executor("ProjectExecutor", node, qctx) {} folly::Future execute() override; + + DataSet handleJob(size_t begin, size_t end, Iterator *iter); }; } // namespace graph diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index 2879272bbd6..c895db27666 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -27,10 +27,12 @@ folly::Future TraverseExecutor::execute() { Status TraverseExecutor::close() { // clear the members reqDs_.rows.clear(); + uniqueDsts_.clear(); return Executor::close(); } Status TraverseExecutor::buildRequestDataSet() { + time::Duration dur; SCOPED_TIMER(&execTime_); auto inputVar = traverse_->inputVar(); auto& inputResult = ectx_->getResult(inputVar); @@ -163,24 +165,47 @@ folly::Future TraverseExecutor::handleResponse(RpcResponse&& resps) { list.values.emplace_back(std::move(*dataset)); } auto listVal = std::make_shared(std::move(list)); - auto iter = std::make_unique(listVal); - auto status = buildInterimPath(iter.get()); - if (!status.ok()) { - return folly::makeFuture(std::move(status)); - } - if (!isFinalStep()) { - if (reqDs_.rows.empty()) { - if (range_ != nullptr) { - return folly::makeFuture(buildResult()); + if (FLAGS_max_job_size <= 1) { + auto iter = std::make_unique(listVal); + auto status = buildInterimPath(iter.get()); + if (!status.ok()) { + return folly::makeFuture(std::move(status)); + } + if (!isFinalStep()) { + if (reqDs_.rows.empty()) { + if (range_ != nullptr) { + return folly::makeFuture(buildResult()); + } else { + return folly::makeFuture(Status::OK()); + } } else { - return folly::makeFuture(Status::OK()); + return getNeighbors(); } } else { - return getNeighbors(); + return folly::makeFuture(buildResult()); } } else { - return folly::makeFuture(buildResult()); + return std::move(buildInterimPathMultiJobs(std::make_unique(listVal))) + .via(runner()) + .thenValue([this](auto&& status) { + if (!status.ok()) { + return folly::makeFuture(std::move(status)); + } + if (!isFinalStep()) { + if (reqDs_.rows.empty()) { + if (range_ != nullptr) { + return folly::makeFuture(buildResult()); + } else { + return folly::makeFuture(Status::OK()); + } + } else { + return getNeighbors(); + } + } else { + return folly::makeFuture(buildResult()); + } + }); } } @@ -267,7 +292,132 @@ Status TraverseExecutor::buildInterimPath(GetNeighborsIter* iter) { return Status::OK(); } -void TraverseExecutor::buildPath(std::unordered_map>& currentPaths, +folly::Future TraverseExecutor::buildInterimPathMultiJobs( + std::unique_ptr iter) { + size_t pathCnt = 0; + const std::unordered_map* prev = &paths_.back(); + if (currentStep_ == 1 && zeroStep()) { + paths_.emplace_back(); + NG_RETURN_IF_ERROR(handleZeroStep(*prev, iter->getVertices(), paths_.back(), pathCnt)); + // If 0..0 case, release memory and return immediately. + if (range_ != nullptr && range_->max() == 0) { + releasePrevPaths(pathCnt); + return Status::OK(); + } + } + paths_.emplace_back(); + + auto scatter = [this, prev]( + size_t begin, size_t end, Iterator* tmpIter) mutable -> StatusOr { + return handleJob(begin, end, tmpIter, *prev); + }; + + auto gather = [this, pathCnt](std::vector> results) mutable -> Status { + reqDs_.clear(); + uniqueDsts_.clear(); + std::unordered_map& current = paths_.back(); + size_t mapCnt = 0; + for (auto& r : results) { + if (!r.ok()) { + return r.status(); + } else { + mapCnt += r.value().newPaths.size(); + } + } + current.reserve(mapCnt); + for (auto& r : results) { + auto jobResult = std::move(r).value(); + pathCnt += jobResult.pathCnt; + if (!jobResult.reqDs.rows.empty()) { + reqDs_.rows.insert(reqDs_.rows.end(), + std::make_move_iterator(jobResult.reqDs.rows.begin()), + std::make_move_iterator(jobResult.reqDs.rows.end())); + } + for (auto& kv : jobResult.newPaths) { + auto& paths = current[kv.first]; + paths.insert(paths.end(), + std::make_move_iterator(kv.second.begin()), + std::make_move_iterator(kv.second.end())); + } + } + releasePrevPaths(pathCnt); + return Status::OK(); + }; + + return runMultiJobs(std::move(scatter), std::move(gather), iter.get()); +} + +StatusOr TraverseExecutor::handleJob(size_t begin, + size_t end, + Iterator* iter, + const std::unordered_map& prev) { + // Handle edges from begin to end, [begin, end) + JobResult jobResult; + size_t& pathCnt = jobResult.pathCnt; + DataSet& reqDs = jobResult.reqDs; + reqDs.colNames = reqDs_.colNames; + QueryExpressionContext ctx(ectx_); + auto* vFilter = traverse_->vFilter() ? traverse_->vFilter()->clone() : nullptr; + auto* eFilter = traverse_->eFilter() ? traverse_->eFilter()->clone() : nullptr; + const auto& spaceInfo = qctx()->rctx()->session()->space(); + std::unordered_map& current = jobResult.newPaths; + for (; iter->valid() && begin++ < end; iter->next()) { + auto& dst = iter->getEdgeProp("*", kDst); + if (!SchemaUtil::isValidVid(dst, *(spaceInfo.spaceDesc.vid_type_ref()))) { + continue; + } + if (vFilter != nullptr && currentStep_ == 1) { + auto& vFilterVal = vFilter->eval(ctx(iter)); + if (!vFilterVal.isBool() || !vFilterVal.getBool()) { + continue; + } + } + if (eFilter != nullptr) { + auto& eFilterVal = eFilter->eval(ctx(iter)); + if (!eFilterVal.isBool() || !eFilterVal.getBool()) { + continue; + } + } + auto srcV = iter->getVertex(); + auto e = iter->getEdge(); + // Join on dst = src + auto pathToSrcFound = prev.find(srcV.getVertex().vid); + if (pathToSrcFound == prev.end()) { + return Status::Error("Can't find prev paths."); + } + const auto& paths = pathToSrcFound->second; + for (auto& prevPath : paths) { + if (hasSameEdge(prevPath, e.getEdge())) { + continue; + } + if (uniqueDsts_.emplace(dst, 0).second) { + reqDs.rows.emplace_back(Row({std::move(dst)})); + } + if (currentStep_ == 1) { + Row path; + if (traverse_->trackPrevPath()) { + path = prevPath; + } + path.values.emplace_back(srcV); + List neighbors; + neighbors.values.emplace_back(e); + path.values.emplace_back(std::move(neighbors)); + buildPath(current, dst, std::move(path)); + ++pathCnt; + } else { + auto path = prevPath; + auto& eList = path.values.back().mutableList().values; + eList.emplace_back(srcV); + eList.emplace_back(e); + buildPath(current, dst, std::move(path)); + ++pathCnt; + } + } // `prevPath' + } // `iter' + return jobResult; +} + +void TraverseExecutor::buildPath(std::unordered_map>& currentPaths, const Value& dst, Row&& path) { auto pathToDstFound = currentPaths.find(dst); @@ -289,7 +439,7 @@ Status TraverseExecutor::buildResult() { DataSet result; result.colNames = traverse_->colNames(); - result.rows.reserve(cnt_); + result.rows.reserve(totalPathCnt_); for (auto& currentStepPaths : paths_) { for (auto& paths : currentStepPaths) { std::move(paths.second.begin(), paths.second.end(), std::back_inserter(result.rows)); @@ -313,6 +463,7 @@ bool TraverseExecutor::hasSameEdge(const Row& prevPath, const Edge& currentEdge) } void TraverseExecutor::releasePrevPaths(size_t cnt) { + time::Duration dur; if (range_ != nullptr) { if (currentStep_ == range_->min() && paths_.size() > 1) { auto rangeEnd = paths_.begin(); @@ -323,11 +474,11 @@ void TraverseExecutor::releasePrevPaths(size_t cnt) { } if (currentStep_ >= range_->min()) { - cnt_ += cnt; + totalPathCnt_ += cnt; } } else { paths_.pop_front(); - cnt_ = cnt; + totalPathCnt_ = cnt; } } diff --git a/src/graph/executor/query/TraverseExecutor.h b/src/graph/executor/query/TraverseExecutor.h index b8f2f864083..bdfe92c5465 100644 --- a/src/graph/executor/query/TraverseExecutor.h +++ b/src/graph/executor/query/TraverseExecutor.h @@ -33,6 +33,17 @@ namespace nebula { namespace graph { using RpcResponse = storage::StorageRpcResponse; +using Dst = Value; +using Paths = std::vector; + +struct JobResult { + // Newly traversed paths size + size_t pathCnt{0}; + // Request dataset for next traverse + DataSet reqDs; + // Newly traversed paths + std::unordered_map newPaths; +}; class TraverseExecutor final : public StorageAccessExecutor { public: @@ -46,8 +57,6 @@ class TraverseExecutor final : public StorageAccessExecutor { Status close() override; private: - using Dst = Value; - using Paths = std::vector; Status buildRequestDataSet(); folly::Future traverse(); @@ -60,6 +69,13 @@ class TraverseExecutor final : public StorageAccessExecutor { Status buildInterimPath(GetNeighborsIter* iter); + folly::Future buildInterimPathMultiJobs(std::unique_ptr iter); + + StatusOr handleJob(size_t begin, + size_t end, + Iterator* iter, + const std::unordered_map& prev); + Status buildResult(); bool isFinalStep() const { @@ -92,8 +108,9 @@ class TraverseExecutor final : public StorageAccessExecutor { const Traverse* traverse_{nullptr}; MatchStepRange* range_{nullptr}; size_t currentStep_{0}; - std::list> paths_; - size_t cnt_{0}; + std::list> paths_; + size_t totalPathCnt_{0}; + folly::ConcurrentHashMap uniqueDsts_; }; } // namespace graph diff --git a/src/graph/executor/test/CMakeLists.txt b/src/graph/executor/test/CMakeLists.txt index 27f4529a2c3..1ef8620550a 100644 --- a/src/graph/executor/test/CMakeLists.txt +++ b/src/graph/executor/test/CMakeLists.txt @@ -57,6 +57,7 @@ SET(EXEC_QUERY_TEST_OBJS $ $ $ + $ ) if(ENABLE_STANDALONE_VERSION) diff --git a/src/graph/gc/CMakeLists.txt b/src/graph/gc/CMakeLists.txt new file mode 100644 index 00000000000..85beda27bb2 --- /dev/null +++ b/src/graph/gc/CMakeLists.txt @@ -0,0 +1,4 @@ +nebula_add_library( + gc_obj OBJECT + GC.cpp +) diff --git a/src/graph/gc/GC.cpp b/src/graph/gc/GC.cpp new file mode 100644 index 00000000000..de3428861a0 --- /dev/null +++ b/src/graph/gc/GC.cpp @@ -0,0 +1,34 @@ +// Copyright (c) 2022 vesoft inc. All rights reserved. +// +// This source code is licensed under Apache 2.0 License. + +#include "graph/gc/GC.h" + +#include "graph/service/GraphFlags.h" + +namespace nebula { +namespace graph { +GC& GC::instance() { + static GC gc; + return gc; +} + +GC::GC() { + if (FLAGS_gc_worker_size == 0) { + workers_.start(std::thread::hardware_concurrency(), "GC"); + } else { + workers_.start(FLAGS_gc_worker_size, "GC"); + } + workers_.addRepeatTaskForAll(50, &GC::periodicTask, this); +} + +void GC::clear(std::vector&& garbage) { + queue_.enqueue(std::move(garbage)); +} + +void GC::periodicTask() { + // TODO: maybe could release by batch + queue_.try_dequeue(); +} +} // namespace graph +} // namespace nebula diff --git a/src/graph/gc/GC.h b/src/graph/gc/GC.h new file mode 100644 index 00000000000..7e18997e47f --- /dev/null +++ b/src/graph/gc/GC.h @@ -0,0 +1,36 @@ +// Copyright (c) 2022 vesoft inc. All rights reserved. +// +// This source code is licensed under Apache 2.0 License. + +#ifndef GRAPH_GC_H_ +#define GRAPH_GC_H_ + +#include "common/base/Base.h" +#include "common/thread/GenericThreadPool.h" +#include "graph/context/Result.h" + +namespace nebula { +namespace graph { + +// Clean the unused memory on background threads, this is helpful +// for big queries since the memory release of interim results may +// cost too much time. +class GC { + public: + static GC& instance(); + + ~GC() { + workers_.stop(); + } + + void clear(std::vector&& garbage); + + private: + GC(); + void periodicTask(); + folly::UMPMCQueue, false> queue_; + thread::GenericThreadPool workers_; +}; +} // namespace graph +} // namespace nebula +#endif // GRAPH_GC_H_ diff --git a/src/graph/optimizer/test/CMakeLists.txt b/src/graph/optimizer/test/CMakeLists.txt index d93ba9c6ccb..1d30c3f6fa8 100644 --- a/src/graph/optimizer/test/CMakeLists.txt +++ b/src/graph/optimizer/test/CMakeLists.txt @@ -53,6 +53,7 @@ set(OPTIMIZER_TEST_LIB $ $ $ + $ ) if(ENABLE_STANDALONE_VERSION) diff --git a/src/graph/planner/test/CMakeLists.txt b/src/graph/planner/test/CMakeLists.txt index 579a7eb0c79..f274fd4a7cd 100644 --- a/src/graph/planner/test/CMakeLists.txt +++ b/src/graph/planner/test/CMakeLists.txt @@ -64,6 +64,7 @@ nebula_add_test( $ $ $ + $ LIBRARIES gtest ${PROXYGEN_LIBRARIES} diff --git a/src/graph/service/GraphFlags.cpp b/src/graph/service/GraphFlags.cpp index 074dbd0bc7c..9029b393e94 100644 --- a/src/graph/service/GraphFlags.cpp +++ b/src/graph/service/GraphFlags.cpp @@ -89,3 +89,15 @@ static bool ValidateSessIdleTimeout(const char* flagname, int32_t value) { } DEFINE_validator(session_idle_timeout_secs, &ValidateSessIdleTimeout); DEFINE_validator(client_idle_timeout_secs, &ValidateSessIdleTimeout); + +DEFINE_int32(min_batch_size, + 8192, + "The min batch size for handling dataset in multi job mode, only enabled when " + "max_job_size is greater than 1."); +DEFINE_int32(max_job_size, 1, "The max job size in multi job mode."); + +DEFINE_bool(enable_async_gc, false, "If enable async gc."); +DEFINE_uint32( + gc_worker_size, + 0, + "Background garbage clean workers, default number is 0 which means using hardware core size."); diff --git a/src/graph/service/GraphFlags.h b/src/graph/service/GraphFlags.h index cddd8c02e83..480f97dff1a 100644 --- a/src/graph/service/GraphFlags.h +++ b/src/graph/service/GraphFlags.h @@ -59,4 +59,9 @@ DECLARE_string(client_white_list); DECLARE_int32(num_rows_to_check_memory); +DECLARE_int32(min_batch_size); +DECLARE_int32(max_job_size); + +DECLARE_bool(enable_async_gc); +DECLARE_uint32(gc_worker_size); #endif // GRAPH_GRAPHFLAGS_H_ diff --git a/src/graph/util/test/CMakeLists.txt b/src/graph/util/test/CMakeLists.txt index 805620705fc..9c470396028 100644 --- a/src/graph/util/test/CMakeLists.txt +++ b/src/graph/util/test/CMakeLists.txt @@ -69,6 +69,7 @@ nebula_add_test( $ $ $ + $ LIBRARIES gtest gtest_main diff --git a/src/graph/validator/test/CMakeLists.txt b/src/graph/validator/test/CMakeLists.txt index 7df86a09738..9d320dcfe2d 100644 --- a/src/graph/validator/test/CMakeLists.txt +++ b/src/graph/validator/test/CMakeLists.txt @@ -59,6 +59,7 @@ set(VALIDATOR_TEST_LIBS $ $ $ + $ ) if(ENABLE_STANDALONE_VERSION) diff --git a/src/graph/visitor/test/CMakeLists.txt b/src/graph/visitor/test/CMakeLists.txt index 8d1f3253df5..c6bd8db3ee6 100644 --- a/src/graph/visitor/test/CMakeLists.txt +++ b/src/graph/visitor/test/CMakeLists.txt @@ -75,6 +75,7 @@ nebula_add_test( $ $ $ + $ LIBRARIES gtest ${THRIFT_LIBRARIES} diff --git a/src/parser/test/CMakeLists.txt b/src/parser/test/CMakeLists.txt index 7bf3596ab73..e244923d29e 100644 --- a/src/parser/test/CMakeLists.txt +++ b/src/parser/test/CMakeLists.txt @@ -47,6 +47,7 @@ set(PARSER_TEST_LIBS $ $ $ + $ ) if(ENABLE_STANDALONE_VERSION) diff --git a/tests/Makefile b/tests/Makefile index 39a73466ca1..0d4119ce3a3 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -21,6 +21,8 @@ CA_SIGNED ?= false CONTAINERIZED ?= false FAILED_LOGIN_ATTEMPTS ?= 0 PASSWORD_LOCK_TIME_IN_SECS ?= 0 +# start up the graph/storage with multi-job mode +QUERY_CONCURRENTLY ?= false # commands gherkin_fmt = ~/.local/bin/reformat-gherkin @@ -74,7 +76,8 @@ up: clean --enable_graph_ssl=$(ENABLE_GRAPH_SSL) \ --enable_meta_ssl=$(ENABLE_META_SSL) \ --ca_signed=$(CA_SIGNED) \ - --containerized=$(CONTAINERIZED) + --containerized=$(CONTAINERIZED) \ + --query_concurrently=$(QUERY_CONCURRENTLY) standalone-up: clean @mkdir -p $(CURR_DIR)/.pytest @@ -86,7 +89,8 @@ standalone-up: clean --enable_graph_ssl=$(ENABLE_GRAPH_SSL) \ --enable_meta_ssl=$(ENABLE_META_SSL) \ --ca_signed=$(CA_SIGNED) \ - --containerized=$(CONTAINERIZED) + --containerized=$(CONTAINERIZED) \ + --query_concurrently=$(QUERY_CONCURRENTLY) down: $(run_test) --cmd=stop --rm_dir=$(RM_DIR) diff --git a/tests/common/nebula_service.py b/tests/common/nebula_service.py index 5dcb27240c6..809484001d4 100644 --- a/tests/common/nebula_service.py +++ b/tests/common/nebula_service.py @@ -129,6 +129,7 @@ def __init__( ca_signed=False, debug_log=True, use_standalone=False, + query_concurrently=False, **kwargs, ): assert graphd_num > 0 and metad_num > 0 and storaged_num > 0 @@ -165,6 +166,8 @@ def __init__( self.lock_file = os.path.join(TMP_DIR, "cluster_port.lock") self.delimiter = "\n" + self.query_concurrently = query_concurrently + if use_standalone == False: self._make_params(**kwargs) self.init_process() @@ -172,6 +175,7 @@ def __init__( self._make_sa_params(**kwargs) self.init_standalone() + def init_standalone(self): process_count = self.metad_num + self.storaged_num + self.graphd_num ports_count = process_count * self.ports_per_process @@ -255,6 +259,7 @@ def init_process(self): p.update_meta_server_addrs(meta_server_addrs) def _make_params(self, **kwargs): + # common params for meta/storage/graph _params = { 'heartbeat_interval_secs': 1, 'expired_time_factor': 60, @@ -272,6 +277,7 @@ def _make_params(self, **kwargs): if self.debug_log: _params['v'] = '4' + # params for graph only self.graphd_param = copy.copy(_params) self.graphd_param['local_config'] = 'false' self.graphd_param['enable_authorize'] = 'true' @@ -283,11 +289,19 @@ def _make_params(self, **kwargs): self.graphd_param['password_lock_time_in_secs'] = '10' # expression depth limit self.graphd_param['max_expression_depth'] = '128' + if self.query_concurrently: + self.graphd_param['max_job_size'] = '4' + # params for storage only self.storaged_param = copy.copy(_params) + if self.query_concurrently: + self.storaged_param["query_concurrently"] = "true" + self.storaged_param['local_config'] = 'false' self.storaged_param['raft_heartbeat_interval_secs'] = '30' self.storaged_param['skip_wait_in_rate_limiter'] = 'true' + + # params for meta only self.metad_param = copy.copy(_params) for p in [self.metad_param, self.storaged_param, self.graphd_param]: p.update(kwargs) @@ -321,7 +335,10 @@ def _make_sa_params(self, **kwargs): self.graphd_param['password_lock_time_in_secs'] = '10' self.graphd_param['raft_heartbeat_interval_secs'] = '30' self.graphd_param['skip_wait_in_rate_limiter'] = 'true' - self.graphd_param['add_local_host'] = 'false' + self.graphd_param['add_local_host'] = 'false' + if self.query_concurrently: + self.graphd_param['max_job_size'] = '4' + for p in [self.metad_param, self.storaged_param, self.graphd_param]: p.update(kwargs) diff --git a/tests/nebula-test-run.py b/tests/nebula-test-run.py index bf23e452941..6703fa6b0d3 100755 --- a/tests/nebula-test-run.py +++ b/tests/nebula-test-run.py @@ -99,6 +99,12 @@ def init_parser(): default=0, help='how long in seconds to lock the account after too many consecutive login attempts provide an incorrect password', ) + opt_parser.add_option( + '--query_concurrently', + dest='query_concurrently', + default='false', + help='Whether enable graph/storage query_concurrently.', + ) return opt_parser @@ -241,7 +247,8 @@ def stop_nebula(nb, configs=None): enable_graph_ssl=configs.enable_graph_ssl, enable_meta_ssl=configs.enable_meta_ssl, containerized=configs.containerized, - use_standalone=is_standalone + use_standalone=is_standalone, + query_concurrently = opt_is(configs.query_concurrently, "true"), ) if opt_is(configs.cmd, "start"): diff --git a/tests/tck/features/optimizer/PrunePropertiesRule.feature b/tests/tck/features/optimizer/PrunePropertiesRule.feature index a3548b73616..468b6b8b69e 100644 --- a/tests/tck/features/optimizer/PrunePropertiesRule.feature +++ b/tests/tck/features/optimizer/PrunePropertiesRule.feature @@ -14,7 +14,8 @@ Feature: Prune Properties rule MATCH p = (v:player{name: "Tony Parker"})-[e:like]->(v2) RETURN v2.player.age """ - Then the result should be, in order: + # Since in distributed system and scatter gather model, the order is not guarrented. + Then the result should be, in any order: | v2.player.age | | 42 | | 33 | @@ -70,7 +71,7 @@ Feature: Prune Properties rule MATCH p = (v:player{name: "Tony Parker"})-[e:like]->(v2) RETURN v2 """ - Then the result should be, in order: + Then the result should be, in any order: | v2 | | ("Tim Duncan" :bachelor{name: "Tim Duncan", speciality: "psychology"} :player{age: 42, name: "Tim Duncan"}) | | ("LaMarcus Aldridge" :player{age: 33, name: "LaMarcus Aldridge"}) | @@ -88,7 +89,7 @@ Feature: Prune Properties rule MATCH p = (v:player{name: "Tony Parker"})-[e:like]-(v2) RETURN p """ - Then the result should be, in order: + Then the result should be, in any order: | p | | <("Tony Parker" :player{age: 36, name: "Tony Parker"})<-[:like@0 {likeness: 99}]-("Dejounte Murray" :player{age: 29, name: "Dejounte Murray"})> | | <("Tony Parker" :player{age: 36, name: "Tony Parker"})-[:like@0 {likeness: 95}]->("Manu Ginobili" :player{age: 41, name: "Manu Ginobili"})> |