diff --git a/src/clients/storage/StorageClientBase-inl.h b/src/clients/storage/StorageClientBase-inl.h index 6e069acc069..84fef19a840 100644 --- a/src/clients/storage/StorageClientBase-inl.h +++ b/src/clients/storage/StorageClientBase-inl.h @@ -17,6 +17,7 @@ #include "common/base/Logging.h" #include "common/base/StatusOr.h" #include "common/datatypes/HostAddr.h" +#include "common/memory/MemoryTracker.h" #include "common/ssl/SSLConfig.h" #include "common/stats/StatsManager.h" #include "common/thrift/ThriftTypes.h" @@ -100,6 +101,7 @@ StorageClientBase::collectResponse( return folly::collectAll(respFutures) .deferValue([this, requests = std::move(requests), totalLatencies, hosts]( std::vector>>&& resps) { + memory::MemoryCheckGuard guard; StorageRpcResponse rpcResp(resps.size()); for (size_t i = 0; i < resps.size(); i++) { auto& host = hosts->at(i); @@ -158,11 +160,13 @@ folly::Future> StorageClientBaseclient(host, evb, false, FLAGS_storage_client_timeout_ms); return remoteFunc(client.get(), request); }) .thenValue([spaceId, this](Response&& resp) mutable -> StatusOr { + memory::MemoryCheckGuard guard; auto& result = resp.get_result(); for (auto& part : result.get_failed_parts()) { auto partId = part.get_part_id(); @@ -192,6 +196,14 @@ folly::Future> StorageClientBase{}, + [](const std::bad_alloc&) { + return folly::makeFuture>(std::bad_alloc()); + }) + .thenError(folly::tag_t{}, + [](const std::exception& e) { + return folly::makeFuture>(std::runtime_error(e.what())); + }) .thenError([request, host, spaceId, this]( folly::exception_wrapper&& exWrapper) mutable -> StatusOr { stats::StatsManager::addValue(kNumRpcSentToStoragedFailed); diff --git a/src/common/memory/MemoryTracker.h b/src/common/memory/MemoryTracker.h index 99d04dde006..1d2f344d56f 100644 --- a/src/common/memory/MemoryTracker.h +++ b/src/common/memory/MemoryTracker.h @@ -41,6 +41,7 @@ struct ThreadMemoryStats { // reserved bytes size in current thread int64_t reserved; + bool throwOnMemoryExceeded{false}; }; /** @@ -127,10 +128,20 @@ class MemoryStats { return fmt::format("MemoryStats: {}/{}", ReadableSize(limit_), ReadableSize(used_)); } + // turn on current thread's throwOnMemoryExceeded + static void turnOnThrow() { + threadMemoryStats_.throwOnMemoryExceeded = true; + } + + // turn off current thread's throwOnMemoryExceeded + static void turnOffThrow() { + threadMemoryStats_.throwOnMemoryExceeded = false; + } + private: inline ALWAYS_INLINE void allocGlobal(int64_t size) { int64_t willBe = size + used_.fetch_add(size, std::memory_order_relaxed); - if (willBe > limit_) { + if (threadMemoryStats_.throwOnMemoryExceeded && willBe > limit_) { // revert used_.fetch_sub(size, std::memory_order_relaxed); throw std::bad_alloc(); @@ -147,6 +158,17 @@ class MemoryStats { static constexpr int64_t kLocalReservedLimit_ = 1 * MiB; }; +// A guard to only enable memory check (throw when memory exceed) during its lifetime. +struct MemoryCheckGuard { + MemoryCheckGuard() { + MemoryStats::turnOnThrow(); + } + + ~MemoryCheckGuard() { + MemoryStats::turnOffThrow(); + } +}; + // A global static memory tracker enable tracking every memory allocation and deallocation. // This is not the place where real memory allocation or deallocation happens, only do the // memory tracking. diff --git a/src/graph/executor/Executor.h b/src/graph/executor/Executor.h index 4736347b7ba..670eb4df302 100644 --- a/src/graph/executor/Executor.h +++ b/src/graph/executor/Executor.h @@ -159,6 +159,7 @@ auto Executor::runMultiJobs(ScatterFunc &&scatter, GatherFunc &&gather, Iterator futures.emplace_back(folly::via( runner(), [begin, end, tmpIter = iter->copy(), f = std::move(scatter)]() mutable -> ScatterResult { + memory::MemoryCheckGuard guard; // Since not all iterators are linear, so iterates to the begin pos size_t tmp = 0; for (; tmpIter->valid() && tmp < begin; ++tmp) { diff --git a/src/graph/executor/algo/BFSShortestPathExecutor.cpp b/src/graph/executor/algo/BFSShortestPathExecutor.cpp index b464a58ba74..7cb4fda3447 100644 --- a/src/graph/executor/algo/BFSShortestPathExecutor.cpp +++ b/src/graph/executor/algo/BFSShortestPathExecutor.cpp @@ -29,18 +29,26 @@ folly::Future BFSShortestPathExecutor::execute() { } std::vector> futures; - auto leftFuture = folly::via(runner(), [this]() { return buildPath(false); }); - auto rightFuture = folly::via(runner(), [this]() { return buildPath(true); }); + auto leftFuture = folly::via(runner(), [this]() { + memory::MemoryCheckGuard guard; + return buildPath(false); + }); + auto rightFuture = folly::via(runner(), [this]() { + memory::MemoryCheckGuard guard; + return buildPath(true); + }); futures.emplace_back(std::move(leftFuture)); futures.emplace_back(std::move(rightFuture)); return folly::collect(futures) .via(runner()) .thenValue([this](auto&& status) { + memory::MemoryCheckGuard guard; UNUSED(status); return conjunctPath(); }) .thenValue([this](auto&& status) { + memory::MemoryCheckGuard guard; UNUSED(status); step_++; DataSet ds; @@ -147,6 +155,7 @@ folly::Future BFSShortestPathExecutor::conjunctPath() { batchVids.push_back(vid); if (++i == totalSize || batchVids.size() == batchSize) { auto future = folly::via(runner(), [this, vids = std::move(batchVids), oddStep]() { + memory::MemoryCheckGuard guard; return doConjunct(vids, oddStep); }); futures.emplace_back(std::move(future)); @@ -156,6 +165,7 @@ folly::Future BFSShortestPathExecutor::conjunctPath() { return folly::collect(futures) .via(runner()) .thenValue([this](auto&& resps) { + memory::MemoryCheckGuard guard; for (auto& resp : resps) { currentDs_.append(std::move(resp)); } diff --git a/src/graph/executor/algo/BatchShortestPath.cpp b/src/graph/executor/algo/BatchShortestPath.cpp index c017f87166c..2400364b592 100644 --- a/src/graph/executor/algo/BatchShortestPath.cpp +++ b/src/graph/executor/algo/BatchShortestPath.cpp @@ -25,6 +25,7 @@ folly::Future BatchShortestPath::execute(const HashSet& startVids, return folly::collect(futures) .via(qctx_->rctx()->runner()) .thenValue([this, result](auto&& resps) { + memory::MemoryCheckGuard guard; for (auto& resp : resps) { NG_RETURN_IF_ERROR(resp); } @@ -107,6 +108,7 @@ folly::Future BatchShortestPath::shortestPath(size_t rowNum, size_t step return folly::collect(futures) .via(qctx_->rctx()->runner()) .thenValue([this, rowNum, stepNum](auto&& resps) { + memory::MemoryCheckGuard guard; for (auto& resp : resps) { if (!resp.ok()) { return folly::makeFuture(std::move(resp)); @@ -151,6 +153,7 @@ folly::Future BatchShortestPath::getNeighbors(size_t rowNum, size_t step nullptr) .via(qctx_->rctx()->runner()) .thenValue([this, rowNum, reverse, stepNum, getNbrTime](auto&& resp) { + memory::MemoryCheckGuard guard; addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse); return buildPath(rowNum, std::move(resp), reverse); }) @@ -280,6 +283,7 @@ folly::Future BatchShortestPath::handleResponse(size_t rowNum, size_t st return folly::makeFuture(Status::OK()) .via(qctx_->rctx()->runner()) .thenValue([this, rowNum](auto&& status) { + memory::MemoryCheckGuard guard; // odd step UNUSED(status); return conjunctPath(rowNum, true); @@ -377,6 +381,7 @@ folly::Future BatchShortestPath::conjunctPath(size_t rowNum, bool oddStep) auto future = getMeetVids(rowNum, oddStep, meetVids); return future.via(qctx_->rctx()->runner()) .thenValue([this, rowNum, oddStep](auto&& vertices) { + memory::MemoryCheckGuard guard; if (vertices.empty()) { return false; } diff --git a/src/graph/executor/algo/MultiShortestPathExecutor.cpp b/src/graph/executor/algo/MultiShortestPathExecutor.cpp index a7b4f641dc3..f77dc4d9d59 100644 --- a/src/graph/executor/algo/MultiShortestPathExecutor.cpp +++ b/src/graph/executor/algo/MultiShortestPathExecutor.cpp @@ -17,19 +17,27 @@ folly::Future MultiShortestPathExecutor::execute() { } std::vector> futures; - auto leftFuture = folly::via(runner(), [this]() { return buildPath(false); }); - auto rightFuture = folly::via(runner(), [this]() { return buildPath(true); }); + auto leftFuture = folly::via(runner(), [this]() { + memory::MemoryCheckGuard guard; + return buildPath(false); + }); + auto rightFuture = folly::via(runner(), [this]() { + memory::MemoryCheckGuard guard; + return buildPath(true); + }); futures.emplace_back(std::move(leftFuture)); futures.emplace_back(std::move(rightFuture)); return folly::collect(futures) .via(runner()) .thenValue([this](auto&& status) { + memory::MemoryCheckGuard guard; // oddStep UNUSED(status); return conjunctPath(true); }) .thenValue([this](auto&& termination) { + memory::MemoryCheckGuard guard; // termination is true, all paths has found if (termination || step_ * 2 > pathNode_->steps()) { return folly::makeFuture(true); @@ -266,8 +274,10 @@ folly::Future MultiShortestPathExecutor::conjunctPath(bool oddStep) { } pathIters.emplace_back(leftIter, rightIter); if (++i == batchSize) { - auto future = folly::via( - runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); }); + auto future = folly::via(runner(), [this, iters = std::move(pathIters)]() { + memory::MemoryCheckGuard guard; + return doConjunct(iters); + }); futures.emplace_back(std::move(future)); pathIters.reserve(batchSize); i = 0; @@ -283,8 +293,10 @@ folly::Future MultiShortestPathExecutor::conjunctPath(bool oddStep) { } pathIters.emplace_back(leftIter, rightIter); if (++i == batchSize) { - auto future = folly::via( - runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); }); + auto future = folly::via(runner(), [this, iters = std::move(pathIters)]() { + memory::MemoryCheckGuard guard; + return doConjunct(iters); + }); futures.emplace_back(std::move(future)); pathIters.reserve(batchSize); i = 0; @@ -292,14 +304,17 @@ folly::Future MultiShortestPathExecutor::conjunctPath(bool oddStep) { } } if (i != 0) { - auto future = - folly::via(runner(), [this, iters = std::move(pathIters)]() { return doConjunct(iters); }); + auto future = folly::via(runner(), [this, iters = std::move(pathIters)]() { + memory::MemoryCheckGuard guard; + return doConjunct(iters); + }); futures.emplace_back(std::move(future)); } return folly::collect(futures) .via(runner()) .thenValue([this](auto&& resps) { + memory::MemoryCheckGuard guard; for (auto& resp : resps) { currentDs_.append(std::move(resp)); } diff --git a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp b/src/graph/executor/algo/ProduceAllPathsExecutor.cpp index a66fbd7a900..44e771ab7cd 100644 --- a/src/graph/executor/algo/ProduceAllPathsExecutor.cpp +++ b/src/graph/executor/algo/ProduceAllPathsExecutor.cpp @@ -27,18 +27,26 @@ folly::Future ProduceAllPathsExecutor::execute() { } } std::vector> futures; - auto leftFuture = folly::via(runner(), [this]() { return buildPath(false); }); - auto rightFuture = folly::via(runner(), [this]() { return buildPath(true); }); + auto leftFuture = folly::via(runner(), [this]() { + memory::MemoryCheckGuard guard; + return buildPath(false); + }); + auto rightFuture = folly::via(runner(), [this]() { + memory::MemoryCheckGuard guard; + return buildPath(true); + }); futures.emplace_back(std::move(leftFuture)); futures.emplace_back(std::move(rightFuture)); return folly::collect(futures) .via(runner()) .thenValue([this](auto&& status) { + memory::MemoryCheckGuard guard; UNUSED(status); return conjunctPath(); }) .thenValue([this](auto&& status) { + memory::MemoryCheckGuard guard; UNUSED(status); step_++; DataSet ds; @@ -147,11 +155,14 @@ folly::Future ProduceAllPathsExecutor::conjunctPath() { if (++i == batchSize) { auto endIter = leftIter; endIter++; - auto oddStepFuture = folly::via( - runner(), [this, startIter, endIter]() { return doConjunct(startIter, endIter, true); }); + auto oddStepFuture = folly::via(runner(), [this, startIter, endIter]() { + memory::MemoryCheckGuard guard; + return doConjunct(startIter, endIter, true); + }); futures.emplace_back(std::move(oddStepFuture)); if (step_ * 2 <= pathNode_->steps()) { auto evenStepFuture = folly::via(runner(), [this, startIter, endIter]() { + memory::MemoryCheckGuard guard; return doConjunct(startIter, endIter, false); }); futures.emplace_back(std::move(evenStepFuture)); @@ -163,12 +174,16 @@ folly::Future ProduceAllPathsExecutor::conjunctPath() { } if (i != 0) { auto endIter = leftPaths_.end(); - auto oddStepFuture = folly::via( - runner(), [this, startIter, endIter]() { return doConjunct(startIter, endIter, true); }); + auto oddStepFuture = folly::via(runner(), [this, startIter, endIter]() { + memory::MemoryCheckGuard guard; + return doConjunct(startIter, endIter, true); + }); futures.emplace_back(std::move(oddStepFuture)); if (step_ * 2 <= pathNode_->steps()) { - auto evenStepFuture = folly::via( - runner(), [this, startIter, endIter]() { return doConjunct(startIter, endIter, false); }); + auto evenStepFuture = folly::via(runner(), [this, startIter, endIter]() { + memory::MemoryCheckGuard guard; + return doConjunct(startIter, endIter, false); + }); futures.emplace_back(std::move(evenStepFuture)); } } @@ -176,6 +191,7 @@ folly::Future ProduceAllPathsExecutor::conjunctPath() { return folly::collect(futures) .via(runner()) .thenValue([this](auto&& resps) { + memory::MemoryCheckGuard guard; for (auto& resp : resps) { currentDs_.append(std::move(resp)); } diff --git a/src/graph/executor/algo/ShortestPathBase.cpp b/src/graph/executor/algo/ShortestPathBase.cpp index b8e678ab57c..9bc82ed0ed9 100644 --- a/src/graph/executor/algo/ShortestPathBase.cpp +++ b/src/graph/executor/algo/ShortestPathBase.cpp @@ -39,6 +39,7 @@ folly::Future> ShortestPathBase::getMeetVidsProps( nullptr) .via(qctx_->rctx()->runner()) .thenValue([this, getPropsTime](PropRpcResponse&& resp) { + memory::MemoryCheckGuard guard; addStats(resp, getPropsTime.elapsedInUSec()); return handlePropResp(std::move(resp)); }) diff --git a/src/graph/executor/algo/SingleShortestPath.cpp b/src/graph/executor/algo/SingleShortestPath.cpp index 87f90705890..69d3dba483a 100644 --- a/src/graph/executor/algo/SingleShortestPath.cpp +++ b/src/graph/executor/algo/SingleShortestPath.cpp @@ -24,6 +24,7 @@ folly::Future SingleShortestPath::execute(const HashSet& startVids, return folly::collect(futures) .via(qctx_->rctx()->runner()) .thenValue([this, result](auto&& resps) { + memory::MemoryCheckGuard guard; for (auto& resp : resps) { NG_RETURN_IF_ERROR(resp); } @@ -76,6 +77,7 @@ folly::Future SingleShortestPath::shortestPath(size_t rowNum, size_t ste return folly::collect(futures) .via(qctx_->rctx()->runner()) .thenValue([this, rowNum, stepNum](auto&& resps) { + memory::MemoryCheckGuard guard; for (auto& resp : resps) { if (!resp.ok()) { return folly::makeFuture(std::move(resp)); @@ -122,6 +124,7 @@ folly::Future SingleShortestPath::getNeighbors(size_t rowNum, nullptr) .via(qctx_->rctx()->runner()) .thenValue([this, rowNum, stepNum, getNbrTime, reverse](auto&& resp) { + memory::MemoryCheckGuard guard; addStats(resp, stepNum, getNbrTime.elapsedInUSec(), reverse); return buildPath(rowNum, std::move(resp), reverse); }) @@ -197,6 +200,7 @@ folly::Future SingleShortestPath::handleResponse(size_t rowNum, size_t s return folly::makeFuture(Status::OK()) .via(qctx_->rctx()->runner()) .thenValue([this, rowNum, stepNum](auto&& status) { + memory::MemoryCheckGuard guard; UNUSED(status); return conjunctPath(rowNum, stepNum); }) @@ -279,6 +283,7 @@ folly::Future SingleShortestPath::buildEvenPath(size_t rowNum, auto future = getMeetVidsProps(meetVids); return future.via(qctx_->rctx()->runner()) .thenValue([this, rowNum](auto&& vertices) { + memory::MemoryCheckGuard guard; if (vertices.empty()) { return false; } diff --git a/src/graph/executor/algo/SubgraphExecutor.cpp b/src/graph/executor/algo/SubgraphExecutor.cpp index 0f88d575e5e..e51f4853c0c 100644 --- a/src/graph/executor/algo/SubgraphExecutor.cpp +++ b/src/graph/executor/algo/SubgraphExecutor.cpp @@ -53,6 +53,7 @@ folly::Future SubgraphExecutor::getNeighbors() { currentStep_ == 1 ? nullptr : subgraph_->tagFilter()) .via(runner()) .thenValue([this, getNbrTime](RpcResponse&& resp) mutable { + memory::MemoryCheckGuard guard; otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getNbrTime.elapsedInUSec())); auto& hostLatency = resp.hostLatency(); for (size_t i = 0; i < hostLatency.size(); ++i) { diff --git a/src/graph/executor/maintain/EdgeExecutor.cpp b/src/graph/executor/maintain/EdgeExecutor.cpp index e42592225a3..e198624af6a 100644 --- a/src/graph/executor/maintain/EdgeExecutor.cpp +++ b/src/graph/executor/maintain/EdgeExecutor.cpp @@ -20,6 +20,7 @@ folly::Future CreateEdgeExecutor::execute() { ->createEdgeSchema(spaceId, ceNode->getName(), ceNode->getSchema(), ceNode->getIfNotExists()) .via(runner()) .thenValue([ceNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Create edge `" << ceNode->getName() << "' failed: " << resp.status(); @@ -45,6 +46,7 @@ folly::Future DescEdgeExecutor::execute() { ->getEdgeSchema(spaceId, deNode->getName()) .via(runner()) .thenValue([this, deNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Desc edge `" << deNode->getName() << "' failed: " << resp.status(); @@ -79,6 +81,7 @@ folly::Future DropEdgeExecutor::execute() { ->dropEdgeSchema(spaceId, deNode->getName(), deNode->getIfExists()) .via(runner()) .thenValue([deNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Drop edge `" << deNode->getName() << "' failed: " << resp.status(); @@ -103,6 +106,7 @@ folly::Future ShowEdgesExecutor::execute() { ->listEdgeSchemas(spaceId) .via(runner()) .thenValue([this, spaceId](StatusOr> resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show edges failed: " << resp.status(); return resp.status(); @@ -143,6 +147,7 @@ folly::Future ShowCreateEdgeExecutor::execute() { ->getEdgeSchema(spaceId, sceNode->getName()) .via(runner()) .thenValue([this, sceNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", ShowCreate edge `" << sceNode->getName() << "' failed: " << resp.status(); @@ -175,6 +180,7 @@ folly::Future AlterEdgeExecutor::execute() { aeNode->space(), aeNode->getName(), aeNode->getSchemaItems(), aeNode->getSchemaProp()) .via(runner()) .thenValue([this, aeNode](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << aeNode->space() << ", Alter edge `" << aeNode->getName() << "' failed: " << resp.status(); diff --git a/src/graph/executor/maintain/EdgeIndexExecutor.cpp b/src/graph/executor/maintain/EdgeIndexExecutor.cpp index ae8175e3a21..38d6a9f3087 100644 --- a/src/graph/executor/maintain/EdgeIndexExecutor.cpp +++ b/src/graph/executor/maintain/EdgeIndexExecutor.cpp @@ -26,6 +26,7 @@ folly::Future CreateEdgeIndexExecutor::execute() { ceiNode->getComment()) .via(runner()) .thenValue([ceiNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Create index `" << ceiNode->getIndexName() << "' at edge: `" << ceiNode->getSchemaName() @@ -52,6 +53,7 @@ folly::Future DropEdgeIndexExecutor::execute() { ->dropEdgeIndex(spaceId, deiNode->getIndexName(), deiNode->getIfExists()) .via(runner()) .thenValue([deiNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Drop edge index`" << deiNode->getIndexName() << "' failed: " << resp.status(); @@ -77,6 +79,7 @@ folly::Future DescEdgeIndexExecutor::execute() { ->getEdgeIndex(spaceId, deiNode->getIndexName()) .via(runner()) .thenValue([this, deiNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Desc edge index`" << deiNode->getIndexName() << "' failed: " << resp.status(); @@ -110,6 +113,7 @@ folly::Future ShowCreateEdgeIndexExecutor::execute() { ->getEdgeIndex(spaceId, sceiNode->getIndexName()) .via(runner()) .thenValue([this, sceiNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show create edge index `" << sceiNode->getIndexName() << "' failed: " << resp.status(); @@ -142,6 +146,7 @@ folly::Future ShowEdgeIndexesExecutor::execute() { ->listEdgeIndexes(spaceId) .via(runner()) .thenValue([this, spaceId, bySchema](StatusOr> resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show edge indexes failed" << resp.status(); return resp.status(); @@ -203,6 +208,7 @@ folly::Future ShowEdgeIndexStatusExecutor::execute() { ->listEdgeIndexStatus(spaceId) .via(runner()) .thenValue([this, spaceId](StatusOr> resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show edge index status failed" << resp.status(); diff --git a/src/graph/executor/maintain/FTIndexExecutor.cpp b/src/graph/executor/maintain/FTIndexExecutor.cpp index b0345ce2906..06d08f91d07 100644 --- a/src/graph/executor/maintain/FTIndexExecutor.cpp +++ b/src/graph/executor/maintain/FTIndexExecutor.cpp @@ -19,6 +19,7 @@ folly::Future CreateFTIndexExecutor::execute() { ->createFTIndex(inode->getIndexName(), inode->getIndex()) .via(runner()) .thenValue([inode](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "Create fulltext index `" << inode->getIndexName() << "' failed: " << resp.status(); @@ -42,6 +43,7 @@ folly::Future DropFTIndexExecutor::execute() { ->dropFTIndex(spaceId, inode->getName()) .via(runner()) .thenValue([this, inode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Drop fulltext index `" << inode->getName() << "' failed: " << resp.status(); @@ -75,6 +77,7 @@ folly::Future ShowFTIndexesExecutor::execute() { .via(runner()) .thenValue( [this, spaceId](StatusOr> resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show fulltext indexes failed" << resp.status(); diff --git a/src/graph/executor/maintain/TagExecutor.cpp b/src/graph/executor/maintain/TagExecutor.cpp index cc17aeb2da0..5beb4c0b1b6 100644 --- a/src/graph/executor/maintain/TagExecutor.cpp +++ b/src/graph/executor/maintain/TagExecutor.cpp @@ -20,6 +20,7 @@ folly::Future CreateTagExecutor::execute() { ->createTagSchema(spaceId, ctNode->getName(), ctNode->getSchema(), ctNode->getIfNotExists()) .via(runner()) .thenValue([ctNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Create tag `" << ctNode->getName() << "' failed: " << resp.status(); @@ -45,6 +46,7 @@ folly::Future DescTagExecutor::execute() { ->getTagSchema(spaceId, dtNode->getName()) .via(runner()) .thenValue([this, dtNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Desc tag `" << dtNode->getName() << "' failed: " << resp.status(); @@ -77,6 +79,7 @@ folly::Future DropTagExecutor::execute() { ->dropTagSchema(spaceId, dtNode->getName(), dtNode->getIfExists()) .via(runner()) .thenValue([dtNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Drop tag `" << dtNode->getName() << "' failed: " << resp.status(); @@ -101,6 +104,7 @@ folly::Future ShowTagsExecutor::execute() { ->listTagSchemas(spaceId) .via(runner()) .thenValue([this, spaceId](StatusOr> resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show tags failed: " << resp.status(); return resp.status(); @@ -141,6 +145,7 @@ folly::Future ShowCreateTagExecutor::execute() { ->getTagSchema(spaceId, sctNode->getName()) .via(runner()) .thenValue([this, sctNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show create tag `" << sctNode->getName() << "' failed: " << resp.status(); @@ -173,6 +178,7 @@ folly::Future AlterTagExecutor::execute() { aeNode->space(), aeNode->getName(), aeNode->getSchemaItems(), aeNode->getSchemaProp()) .via(runner()) .thenValue([aeNode](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << aeNode->space() << ", Alter tag `" << aeNode->getName() << "' failed: " << resp.status(); diff --git a/src/graph/executor/maintain/TagIndexExecutor.cpp b/src/graph/executor/maintain/TagIndexExecutor.cpp index a25779a0c4c..ad14dcd3788 100644 --- a/src/graph/executor/maintain/TagIndexExecutor.cpp +++ b/src/graph/executor/maintain/TagIndexExecutor.cpp @@ -26,6 +26,7 @@ folly::Future CreateTagIndexExecutor::execute() { ctiNode->getComment()) .via(runner()) .thenValue([ctiNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Create index `" << ctiNode->getIndexName() << "' at tag: `" << ctiNode->getSchemaName() @@ -52,6 +53,7 @@ folly::Future DropTagIndexExecutor::execute() { ->dropTagIndex(spaceId, dtiNode->getIndexName(), dtiNode->getIfExists()) .via(runner()) .thenValue([dtiNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Drop tag index `" << dtiNode->getIndexName() << "' failed: " << resp.status(); @@ -77,6 +79,7 @@ folly::Future DescTagIndexExecutor::execute() { ->getTagIndex(spaceId, dtiNode->getIndexName()) .via(runner()) .thenValue([this, dtiNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Desc tag index `" << dtiNode->getIndexName() << "' failed: " << resp.status(); @@ -110,6 +113,7 @@ folly::Future ShowCreateTagIndexExecutor::execute() { ->getTagIndex(spaceId, sctiNode->getIndexName()) .via(runner()) .thenValue([this, sctiNode, spaceId](StatusOr resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show create tag index `" << sctiNode->getIndexName() << "' failed: " << resp.status(); @@ -142,6 +146,7 @@ folly::Future ShowTagIndexesExecutor::execute() { ->listTagIndexes(spaceId) .via(runner()) .thenValue([this, spaceId, bySchema](StatusOr> resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show tag indexes failed" << resp.status(); return resp.status(); @@ -204,6 +209,7 @@ folly::Future ShowTagIndexStatusExecutor::execute() { ->listTagIndexStatus(spaceId) .via(runner()) .thenValue([this, spaceId](StatusOr> resp) { + memory::MemoryCheckGuard guard; if (!resp.ok()) { LOG(WARNING) << "SpaceId: " << spaceId << ", Show tag index status failed: " << resp.status(); diff --git a/src/graph/executor/mutate/DeleteExecutor.cpp b/src/graph/executor/mutate/DeleteExecutor.cpp index 3372ddf0351..05a12a05ea6 100644 --- a/src/graph/executor/mutate/DeleteExecutor.cpp +++ b/src/graph/executor/mutate/DeleteExecutor.cpp @@ -69,6 +69,7 @@ folly::Future DeleteVerticesExecutor::deleteVertices() { VLOG(1) << "Delete vertices time: " << deleteVertTime.elapsedInUSec() << "us"; }) .thenValue([this](storage::StorageRpcResponse resp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); @@ -130,6 +131,7 @@ folly::Future DeleteTagsExecutor::deleteTags() { VLOG(1) << "Delete vertices time: " << deleteTagTime.elapsedInUSec() << "us"; }) .thenValue([this](storage::StorageRpcResponse resp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); @@ -222,6 +224,7 @@ folly::Future DeleteEdgesExecutor::deleteEdges() { VLOG(1) << "Delete edge time: " << deleteEdgeTime.elapsedInUSec() << "us"; }) .thenValue([this](storage::StorageRpcResponse resp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); diff --git a/src/graph/executor/mutate/InsertExecutor.cpp b/src/graph/executor/mutate/InsertExecutor.cpp index 166dc7630f4..53076051aeb 100644 --- a/src/graph/executor/mutate/InsertExecutor.cpp +++ b/src/graph/executor/mutate/InsertExecutor.cpp @@ -36,6 +36,7 @@ folly::Future InsertVerticesExecutor::insertVertices() { VLOG(1) << "Add vertices time: " << addVertTime.elapsedInUSec() << "us"; }) .thenValue([this](storage::StorageRpcResponse resp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); @@ -72,6 +73,7 @@ folly::Future InsertEdgesExecutor::insertEdges() { .ensure( [addEdgeTime]() { VLOG(1) << "Add edge time: " << addEdgeTime.elapsedInUSec() << "us"; }) .thenValue([this](storage::StorageRpcResponse resp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); NG_RETURN_IF_ERROR(handleCompleteness(resp, false)); return Status::OK(); diff --git a/src/graph/executor/mutate/UpdateExecutor.cpp b/src/graph/executor/mutate/UpdateExecutor.cpp index 6b940c0ca20..355fd40e14f 100644 --- a/src/graph/executor/mutate/UpdateExecutor.cpp +++ b/src/graph/executor/mutate/UpdateExecutor.cpp @@ -61,6 +61,7 @@ folly::Future UpdateVertexExecutor::execute() { VLOG(1) << "Update vertice time: " << updateVertTime.elapsedInUSec() << "us"; }) .thenValue([this](StatusOr resp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); if (!resp.ok()) { LOG(WARNING) << "Update vertices fail: " << resp.status(); @@ -118,6 +119,7 @@ folly::Future UpdateEdgeExecutor::execute() { VLOG(1) << "Update edge time: " << updateEdgeTime.elapsedInUSec() << "us"; }) .thenValue([this](StatusOr resp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); if (!resp.ok()) { LOG(WARNING) << "Update edge failed: " << resp.status(); diff --git a/src/graph/executor/query/AppendVerticesExecutor.cpp b/src/graph/executor/query/AppendVerticesExecutor.cpp index 03bf992fc03..02bcd7bf81c 100644 --- a/src/graph/executor/query/AppendVerticesExecutor.cpp +++ b/src/graph/executor/query/AppendVerticesExecutor.cpp @@ -61,6 +61,7 @@ folly::Future AppendVerticesExecutor::appendVertices() { otherStats_.emplace("total_rpc", folly::sformat("{}(us)", getPropsTime.elapsedInUSec())); }) .thenValue([this](StorageRpcResponse &&rpcResp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); addStats(rpcResp, otherStats_); if (FLAGS_max_job_size <= 1) { @@ -193,6 +194,7 @@ folly::Future AppendVerticesExecutor::handleRespMultiJobs( }; auto gather = [this](auto &&results) -> Status { + memory::MemoryCheckGuard guard; for (auto &r : results) { auto &&rows = std::move(r).value(); result_.rows.insert(result_.rows.end(), @@ -211,6 +213,7 @@ folly::Future AppendVerticesExecutor::handleRespMultiJobs( auto gather = [this, inputIterNew = std::move(inputIter)](auto &&prepareResult) -> folly::Future { + memory::MemoryCheckGuard guard1; UNUSED(prepareResult); auto scatterInput = @@ -219,6 +222,7 @@ folly::Future AppendVerticesExecutor::handleRespMultiJobs( }; auto gatherFinal = [this](auto &&results) -> Status { + memory::MemoryCheckGuard guard2; for (auto &r : results) { auto &&rows = std::move(r).value(); result_.rows.insert(result_.rows.end(), diff --git a/src/graph/executor/query/FilterExecutor.cpp b/src/graph/executor/query/FilterExecutor.cpp index 6a14cace334..4f2e7334326 100644 --- a/src/graph/executor/query/FilterExecutor.cpp +++ b/src/graph/executor/query/FilterExecutor.cpp @@ -34,6 +34,7 @@ folly::Future FilterExecutor::execute() { auto gather = [this, result = std::move(ds), kind = iter->kind()](auto &&results) mutable -> Status { + memory::MemoryCheckGuard guard; for (auto &r : results) { if (!r.ok()) { return r.status(); diff --git a/src/graph/executor/query/GetDstBySrcExecutor.cpp b/src/graph/executor/query/GetDstBySrcExecutor.cpp index 7914dd6f873..e694d43971f 100644 --- a/src/graph/executor/query/GetDstBySrcExecutor.cpp +++ b/src/graph/executor/query/GetDstBySrcExecutor.cpp @@ -48,6 +48,7 @@ folly::Future GetDstBySrcExecutor::execute() { otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getDstTime.elapsedInUSec())); }) .thenValue([this](StorageRpcResponse&& resp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); auto& hostLatency = resp.hostLatency(); for (size_t i = 0; i < hostLatency.size(); ++i) { diff --git a/src/graph/executor/query/GetEdgesExecutor.cpp b/src/graph/executor/query/GetEdgesExecutor.cpp index b7a6fa55f52..98306c39d51 100644 --- a/src/graph/executor/query/GetEdgesExecutor.cpp +++ b/src/graph/executor/query/GetEdgesExecutor.cpp @@ -101,6 +101,7 @@ folly::Future GetEdgesExecutor::getEdges() { otherStats_.emplace("total_rpc", folly::sformat("{}(us)", getPropsTime.elapsedInUSec())); }) .thenValue([this, ge](StorageRpcResponse &&rpcResp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), ge->colNames()); diff --git a/src/graph/executor/query/GetNeighborsExecutor.cpp b/src/graph/executor/query/GetNeighborsExecutor.cpp index 956123ac587..0b69967c41e 100644 --- a/src/graph/executor/query/GetNeighborsExecutor.cpp +++ b/src/graph/executor/query/GetNeighborsExecutor.cpp @@ -62,6 +62,7 @@ folly::Future GetNeighborsExecutor::execute() { otherStats_.emplace("total_rpc_time", folly::sformat("{}(us)", getNbrTime.elapsedInUSec())); }) .thenValue([this](StorageRpcResponse&& resp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); auto& hostLatency = resp.hostLatency(); for (size_t i = 0; i < hostLatency.size(); ++i) { diff --git a/src/graph/executor/query/GetVerticesExecutor.cpp b/src/graph/executor/query/GetVerticesExecutor.cpp index c3a08893b2f..3906d3c564e 100644 --- a/src/graph/executor/query/GetVerticesExecutor.cpp +++ b/src/graph/executor/query/GetVerticesExecutor.cpp @@ -51,6 +51,7 @@ folly::Future GetVerticesExecutor::getVertices() { otherStats_.emplace("total_rpc", folly::sformat("{}(us)", getPropsTime.elapsedInUSec())); }) .thenValue([this, gv](StorageRpcResponse &&rpcResp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), gv->colNames()); diff --git a/src/graph/executor/query/IndexScanExecutor.cpp b/src/graph/executor/query/IndexScanExecutor.cpp index d5a5896094d..e2bd6a65d9f 100644 --- a/src/graph/executor/query/IndexScanExecutor.cpp +++ b/src/graph/executor/query/IndexScanExecutor.cpp @@ -42,6 +42,7 @@ folly::Future IndexScanExecutor::indexScan() { lookup->limit(qctx_)) .via(runner()) .thenValue([this](StorageRpcResponse &&rpcResp) { + memory::MemoryCheckGuard guard; addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp)); }) diff --git a/src/graph/executor/query/InnerJoinExecutor.cpp b/src/graph/executor/query/InnerJoinExecutor.cpp index 0919b939c23..37512d971f8 100644 --- a/src/graph/executor/query/InnerJoinExecutor.cpp +++ b/src/graph/executor/query/InnerJoinExecutor.cpp @@ -173,6 +173,7 @@ folly::Future InnerJoinExecutor::probe(const std::vector& p }; auto gather = [this](auto&& results) mutable -> Status { + memory::MemoryCheckGuard guard; DataSet result; auto* joinNode = asNode(node()); result.colNames = joinNode->colNames(); @@ -204,6 +205,7 @@ folly::Future InnerJoinExecutor::singleKeyProbe(Expression* probeKey, It }; auto gather = [this](auto&& results) mutable -> Status { + memory::MemoryCheckGuard guard; DataSet result; auto* joinNode = asNode(node()); result.colNames = joinNode->colNames(); diff --git a/src/graph/executor/query/LeftJoinExecutor.cpp b/src/graph/executor/query/LeftJoinExecutor.cpp index b21aaa34969..d2b12131b1c 100644 --- a/src/graph/executor/query/LeftJoinExecutor.cpp +++ b/src/graph/executor/query/LeftJoinExecutor.cpp @@ -150,6 +150,7 @@ folly::Future LeftJoinExecutor::probe(const std::vector& pr }; auto gather = [this](auto&& results) mutable -> Status { + memory::MemoryCheckGuard guard; DataSet result; auto* joinNode = asNode(node()); result.colNames = joinNode->colNames(); @@ -180,6 +181,7 @@ folly::Future LeftJoinExecutor::singleKeyProbe(Expression* probeKey, Ite }; auto gather = [this](auto&& results) mutable -> Status { + memory::MemoryCheckGuard guard; DataSet result; auto* joinNode = asNode(node()); result.colNames = joinNode->colNames(); diff --git a/src/graph/executor/query/ProjectExecutor.cpp b/src/graph/executor/query/ProjectExecutor.cpp index dd1104245b9..3f54a1daa6b 100644 --- a/src/graph/executor/query/ProjectExecutor.cpp +++ b/src/graph/executor/query/ProjectExecutor.cpp @@ -29,6 +29,7 @@ folly::Future ProjectExecutor::execute() { }; auto gather = [this, result = std::move(ds)](auto &&results) mutable { + memory::MemoryCheckGuard guard; for (auto &r : results) { auto &&rows = std::move(r).value(); result.rows.insert(result.rows.end(), diff --git a/src/graph/executor/query/ScanEdgesExecutor.cpp b/src/graph/executor/query/ScanEdgesExecutor.cpp index 4064312b7ab..4262ead8b72 100644 --- a/src/graph/executor/query/ScanEdgesExecutor.cpp +++ b/src/graph/executor/query/ScanEdgesExecutor.cpp @@ -41,6 +41,7 @@ folly::Future ScanEdgesExecutor::scanEdges() { otherStats_.emplace("total_rpc", folly::sformat("{}(us)", scanEdgesTime.elapsedInUSec())); }) .thenValue([this](StorageRpcResponse &&rpcResp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), {}); diff --git a/src/graph/executor/query/ScanVerticesExecutor.cpp b/src/graph/executor/query/ScanVerticesExecutor.cpp index e4f32642659..9cd80e7011f 100644 --- a/src/graph/executor/query/ScanVerticesExecutor.cpp +++ b/src/graph/executor/query/ScanVerticesExecutor.cpp @@ -42,6 +42,7 @@ folly::Future ScanVerticesExecutor::scanVertices() { otherStats_.emplace("total_rpc", folly::sformat("{}(us)", scanVertexTime.elapsedInUSec())); }) .thenValue([this, sv](StorageRpcResponse &&rpcResp) { + memory::MemoryCheckGuard guard; SCOPED_TIMER(&execTime_); addStats(rpcResp, otherStats_); return handleResp(std::move(rpcResp), sv->colNames()); diff --git a/src/graph/executor/query/TraverseExecutor.cpp b/src/graph/executor/query/TraverseExecutor.cpp index a92312761f2..f0e319c9462 100644 --- a/src/graph/executor/query/TraverseExecutor.cpp +++ b/src/graph/executor/query/TraverseExecutor.cpp @@ -104,6 +104,7 @@ folly::Future TraverseExecutor::getNeighbors() { currentStep_ == 1 ? traverse_->tagFilter() : nullptr) .via(runner()) .thenValue([this, getNbrTime](StorageRpcResponse&& resp) mutable { + memory::MemoryCheckGuard guard; vids_.clear(); SCOPED_TIMER(&execTime_); addStats(resp, getNbrTime.elapsedInUSec()); @@ -337,6 +338,7 @@ folly::Future TraverseExecutor::buildPathMultiJobs(size_t minStep, size_ }; auto gather = [this](std::vector> resp) mutable -> Status { + memory::MemoryCheckGuard guard; for (auto& rows : resp) { if (rows.empty()) { continue; diff --git a/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp b/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp index 7552013f429..8e4f17f171e 100644 --- a/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp +++ b/src/graph/scheduler/AsyncMsgNotifyBasedScheduler.cpp @@ -16,7 +16,6 @@ AsyncMsgNotifyBasedScheduler::AsyncMsgNotifyBasedScheduler(QueryContext* qctx) : } folly::Future AsyncMsgNotifyBasedScheduler::schedule() { - try { auto root = qctx_->plan()->root(); if (FLAGS_enable_lifetime_optimize) { // special for root @@ -26,13 +25,6 @@ folly::Future AsyncMsgNotifyBasedScheduler::schedule() { } auto executor = Executor::create(root, qctx_); return doSchedule(executor); - } catch (std::bad_alloc& e) { - return folly::makeFuture(Executor::memoryExceededStatus()); - } catch (std::exception& e) { - return folly::makeFuture(std::runtime_error(e.what())); - } catch (...) { - return folly::makeFuture(std::runtime_error("unknown exception")); - } } folly::Future AsyncMsgNotifyBasedScheduler::doSchedule(Executor* root) const { diff --git a/src/graph/service/GraphService.cpp b/src/graph/service/GraphService.cpp index adaff15798f..5f4c88849a5 100644 --- a/src/graph/service/GraphService.cpp +++ b/src/graph/service/GraphService.cpp @@ -11,6 +11,7 @@ #include "clients/storage/StorageClient.h" #include "common/base/Base.h" +#include "common/memory/MemoryTracker.h" #include "common/stats/StatsManager.h" #include "common/time/Duration.h" #include "common/time/TimezoneInfo.h" diff --git a/src/graph/service/QueryEngine.cpp b/src/graph/service/QueryEngine.cpp index f7045f7fd50..4f6927dbcd1 100644 --- a/src/graph/service/QueryEngine.cpp +++ b/src/graph/service/QueryEngine.cpp @@ -46,6 +46,7 @@ Status QueryEngine::init(std::shared_ptr ioExecutor // Create query context and query instance and execute it void QueryEngine::execute(RequestContextPtr rctx) { + memory::MemoryCheckGuard guard; auto qctx = std::make_unique(std::move(rctx), schemaManager_.get(), indexManager_.get(), diff --git a/src/graph/service/QueryInstance.cpp b/src/graph/service/QueryInstance.cpp index 16ebf4747b5..63bd78b7714 100644 --- a/src/graph/service/QueryInstance.cpp +++ b/src/graph/service/QueryInstance.cpp @@ -37,32 +37,42 @@ QueryInstance::QueryInstance(std::unique_ptr qctx, Optimizer *opti } void QueryInstance::execute() { - Status status = validateAndOptimize(); - if (!status.ok()) { - onError(std::move(status)); - return; - } + try { + memory::MemoryCheckGuard guard1; + Status status = validateAndOptimize(); + if (!status.ok()) { + onError(std::move(status)); + return; + } - // Sentence is explain query, finish - if (!explainOrContinue()) { - onFinish(); - return; - } + // Sentence is explain query, finish + if (!explainOrContinue()) { + onFinish(); + return; + } - // The execution engine converts the physical execution plan generated by the Planner into a - // series of Executors through the Scheduler to drive the execution of the Executors. - scheduler_->schedule() - .thenValue([this](Status s) { - if (s.ok()) { - this->onFinish(); - } else { - this->onError(std::move(s)); - } - }) - .thenError(folly::tag_t{}, - [this](const ExecutionError &e) { onError(e.status()); }) - .thenError(folly::tag_t{}, - [this](const std::exception &e) { onError(Status::Error("%s", e.what())); }); + // The execution engine converts the physical execution plan generated by the Planner into a + // series of Executors through the Scheduler to drive the execution of the Executors. + scheduler_->schedule() + .thenValue([this](Status s) { + memory::MemoryCheckGuard guard2; + if (s.ok()) { + this->onFinish(); + } else { + this->onError(std::move(s)); + } + }) + .thenError(folly::tag_t{}, + [this](const ExecutionError &e) { onError(e.status()); }) + .thenError(folly::tag_t{}, + [this](const std::exception &e) { onError(Status::Error("%s", e.what())); }); + } catch (std::bad_alloc &e) { + onError(Executor::memoryExceededStatus()); + } catch (std::exception &e) { + onError(Status::Error("%s", e.what())); + } catch (...) { + onError(Status::Error("unknown error")); + } } Status QueryInstance::validateAndOptimize() { diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 64ce3c56ec6..cbba10e2b69 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -149,6 +149,7 @@ set(meta_test_deps $ $ $ + $ $ $ $ diff --git a/src/storage/GraphStorageServiceHandler.cpp b/src/storage/GraphStorageServiceHandler.cpp index b246c1624a3..d03ce0bc5dc 100644 --- a/src/storage/GraphStorageServiceHandler.cpp +++ b/src/storage/GraphStorageServiceHandler.cpp @@ -5,6 +5,7 @@ #include "storage/GraphStorageServiceHandler.h" +#include "common/memory/MemoryTracker.h" #include "storage/index/LookupProcessor.h" #include "storage/kv/GetProcessor.h" #include "storage/kv/PutProcessor.h" @@ -26,6 +27,7 @@ #include "storage/transaction/ChainUpdateEdgeLocalProcessor.h" #define RETURN_FUTURE(processor) \ + memory::MemoryCheckGuard guard; \ auto f = processor->getFuture(); \ processor->process(req); \ return f; diff --git a/src/storage/index/LookupProcessor.cpp b/src/storage/index/LookupProcessor.cpp index d752e2d328d..12c2e239cca 100644 --- a/src/storage/index/LookupProcessor.cpp +++ b/src/storage/index/LookupProcessor.cpp @@ -281,6 +281,7 @@ void LookupProcessor::runInMultipleThread(const std::vector& parts, futures.emplace_back( folly::via(executor_, [this, plan = std::move(planCopy[i]), part = parts[i]]() -> ReturnType { + memory::MemoryCheckGuard guard; ::nebula::cpp2::ErrorCode code = ::nebula::cpp2::ErrorCode::SUCCEEDED; std::deque dataset; plan->execute(part); @@ -323,6 +324,7 @@ void LookupProcessor::runInMultipleThread(const std::vector& parts, })); } folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) { + memory::MemoryCheckGuard guard; CHECK(!t.hasException()); const auto& tries = t.value(); std::vector statResults; diff --git a/src/storage/kv/GetProcessor.cpp b/src/storage/kv/GetProcessor.cpp index 97a51efc1d4..22f507cec94 100644 --- a/src/storage/kv/GetProcessor.cpp +++ b/src/storage/kv/GetProcessor.cpp @@ -13,42 +13,53 @@ namespace storage { ProcessorCounters kGetCounters; void GetProcessor::process(const cpp2::KVGetRequest& req) { - CHECK_NOTNULL(env_->kvstore_); - GraphSpaceID spaceId = req.get_space_id(); - bool returnPartly = req.get_return_partly(); - - std::unordered_map pairs; - size_t size = 0; - for (auto& part : req.get_parts()) { - size += part.second.size(); - } - pairs.reserve(size); - - for (auto& part : req.get_parts()) { - auto partId = part.first; - auto& keys = part.second; - std::vector kvKeys; - kvKeys.reserve(part.second.size()); - std::transform(keys.begin(), keys.end(), std::back_inserter(kvKeys), [partId](const auto& key) { - return NebulaKeyUtils::kvKey(partId, key); - }); - std::vector values; - auto ret = env_->kvstore_->multiGet(spaceId, partId, kvKeys, &values); - if ((ret.first == nebula::cpp2::ErrorCode::SUCCEEDED) || - (ret.first == nebula::cpp2::ErrorCode::E_PARTIAL_RESULT && returnPartly)) { - auto& status = ret.second; - for (size_t i = 0; i < kvKeys.size(); i++) { - if (status[i].ok()) { - pairs.emplace(keys[i], values[i]); + try { + CHECK_NOTNULL(env_->kvstore_); + GraphSpaceID spaceId = req.get_space_id(); + bool returnPartly = req.get_return_partly(); + + std::unordered_map pairs; + size_t size = 0; + for (auto& part : req.get_parts()) { + size += part.second.size(); + } + pairs.reserve(size); + + for (auto& part : req.get_parts()) { + auto partId = part.first; + auto& keys = part.second; + std::vector kvKeys; + kvKeys.reserve(part.second.size()); + std::transform( + keys.begin(), keys.end(), std::back_inserter(kvKeys), [partId](const auto& key) { + return NebulaKeyUtils::kvKey(partId, key); + }); + std::vector values; + auto ret = env_->kvstore_->multiGet(spaceId, partId, kvKeys, &values); + if ((ret.first == nebula::cpp2::ErrorCode::SUCCEEDED) || + (ret.first == nebula::cpp2::ErrorCode::E_PARTIAL_RESULT && returnPartly)) { + auto& status = ret.second; + for (size_t i = 0; i < kvKeys.size(); i++) { + if (status[i].ok()) { + pairs.emplace(keys[i], values[i]); + } } + } else { + handleErrorCode(ret.first, spaceId, partId); } - } else { - handleErrorCode(ret.first, spaceId, partId); } - } - resp_.key_values_ref() = std::move(pairs); - this->onFinished(); + resp_.key_values_ref() = std::move(pairs); + this->onFinished(); + } catch (std::bad_alloc& e) { + memoryExceeded_ = true; + onError(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + onError(); + } catch (...) { + onError(); + } } } // namespace storage diff --git a/src/storage/kv/PutProcessor.cpp b/src/storage/kv/PutProcessor.cpp index 24ef9a9dd25..4f97feada50 100644 --- a/src/storage/kv/PutProcessor.cpp +++ b/src/storage/kv/PutProcessor.cpp @@ -13,19 +13,29 @@ namespace storage { ProcessorCounters kPutCounters; void PutProcessor::process(const cpp2::KVPutRequest& req) { - CHECK_NOTNULL(env_->kvstore_); - const auto& pairs = req.get_parts(); - auto space = req.get_space_id(); - callingNum_ = pairs.size(); + try { + CHECK_NOTNULL(env_->kvstore_); + const auto& pairs = req.get_parts(); + auto space = req.get_space_id(); + callingNum_ = pairs.size(); - std::for_each(pairs.begin(), pairs.end(), [&](auto& value) { - auto part = value.first; - std::vector data; - for (auto& pair : value.second) { - data.emplace_back(std::move(NebulaKeyUtils::kvKey(part, pair.key)), std::move(pair.value)); - } - doPut(space, part, std::move(data)); - }); + std::for_each(pairs.begin(), pairs.end(), [&](auto& value) { + auto part = value.first; + std::vector data; + for (auto& pair : value.second) { + data.emplace_back(std::move(NebulaKeyUtils::kvKey(part, pair.key)), std::move(pair.value)); + } + doPut(space, part, std::move(data)); + }); + } catch (std::bad_alloc& e) { + memoryExceeded_ = true; + onError(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + onError(); + } catch (...) { + onError(); + } } } // namespace storage diff --git a/src/storage/kv/RemoveProcessor.cpp b/src/storage/kv/RemoveProcessor.cpp index 3588e5b95e6..f074de0b123 100644 --- a/src/storage/kv/RemoveProcessor.cpp +++ b/src/storage/kv/RemoveProcessor.cpp @@ -13,19 +13,29 @@ namespace storage { ProcessorCounters kRemoveCounters; void RemoveProcessor::process(const cpp2::KVRemoveRequest& req) { - CHECK_NOTNULL(env_->kvstore_); - const auto& pairs = req.get_parts(); - auto space = req.get_space_id(); - callingNum_ = pairs.size(); + try { + CHECK_NOTNULL(env_->kvstore_); + const auto& pairs = req.get_parts(); + auto space = req.get_space_id(); + callingNum_ = pairs.size(); - std::for_each(pairs.begin(), pairs.end(), [&](auto& value) { - auto part = value.first; - std::vector keys; - for (auto& key : value.second) { - keys.emplace_back(std::move(NebulaKeyUtils::kvKey(part, key))); - } - doRemove(space, part, std::move(keys)); - }); + std::for_each(pairs.begin(), pairs.end(), [&](auto& value) { + auto part = value.first; + std::vector keys; + for (auto& key : value.second) { + keys.emplace_back(std::move(NebulaKeyUtils::kvKey(part, key))); + } + doRemove(space, part, std::move(keys)); + }); + } catch (std::bad_alloc& e) { + memoryExceeded_ = true; + onError(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + onError(); + } catch (...) { + onError(); + } } } // namespace storage diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index f0d16bd8cfc..47ca51a33cc 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -8,8 +8,8 @@ #include #include "codec/RowWriterV2.h" +#include "common/memory/MemoryTracker.h" #include "common/stats/StatsManager.h" -#include "common/time/WallClock.h" #include "common/utils/IndexKeyUtils.h" #include "common/utils/NebulaKeyUtils.h" #include "common/utils/OperationKeyUtils.h" @@ -21,43 +21,53 @@ namespace storage { ProcessorCounters kAddEdgesCounters; void AddEdgesProcessor::process(const cpp2::AddEdgesRequest& req) { - spaceId_ = req.get_space_id(); - ifNotExists_ = req.get_if_not_exists(); - const auto& partEdges = req.get_parts(); - - CHECK_NOTNULL(env_->schemaMan_); - auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); - if (!ret.ok()) { - LOG(ERROR) << ret.status(); - for (auto& part : partEdges) { - pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); + try { + spaceId_ = req.get_space_id(); + ifNotExists_ = req.get_if_not_exists(); + const auto& partEdges = req.get_parts(); + + CHECK_NOTNULL(env_->schemaMan_); + auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); + if (!ret.ok()) { + LOG(ERROR) << ret.status(); + for (auto& part : partEdges) { + pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); + } + onFinished(); + return; } - onFinished(); - return; - } - spaceVidLen_ = ret.value(); - callingNum_ = partEdges.size(); + spaceVidLen_ = ret.value(); + callingNum_ = partEdges.size(); - CHECK_NOTNULL(env_->indexMan_); - auto iRet = env_->indexMan_->getEdgeIndexes(spaceId_); - if (!iRet.ok()) { - LOG(ERROR) << iRet.status(); - for (auto& part : partEdges) { - pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); + CHECK_NOTNULL(env_->indexMan_); + auto iRet = env_->indexMan_->getEdgeIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << iRet.status(); + for (auto& part : partEdges) { + pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); + } + onFinished(); + return; } - onFinished(); - return; - } - indexes_ = std::move(iRet).value(); - ignoreExistedIndex_ = req.get_ignore_existed_index(); + indexes_ = std::move(iRet).value(); + ignoreExistedIndex_ = req.get_ignore_existed_index(); - CHECK_NOTNULL(env_->kvstore_); + CHECK_NOTNULL(env_->kvstore_); - if (indexes_.empty()) { - doProcess(req); - } else { - doProcessWithIndex(req); + if (indexes_.empty()) { + doProcess(req); + } else { + doProcessWithIndex(req); + } + } catch (std::bad_alloc& e) { + memoryExceeded_ = true; + onError(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + onError(); + } catch (...) { + onError(); } } diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 3b6b4675623..22a94e2948e 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -8,8 +8,8 @@ #include #include "codec/RowWriterV2.h" +#include "common/memory/MemoryTracker.h" #include "common/stats/StatsManager.h" -#include "common/time/WallClock.h" #include "common/utils/IndexKeyUtils.h" #include "common/utils/NebulaKeyUtils.h" #include "common/utils/OperationKeyUtils.h" @@ -22,40 +22,51 @@ namespace storage { ProcessorCounters kAddVerticesCounters; void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) { - spaceId_ = req.get_space_id(); - const auto& partVertices = req.get_parts(); - ifNotExists_ = req.get_if_not_exists(); - CHECK_NOTNULL(env_->schemaMan_); - auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); - if (!ret.ok()) { - LOG(ERROR) << ret.status(); - for (auto& part : partVertices) { - pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); + try { + memory::MemoryCheckGuard guard; + spaceId_ = req.get_space_id(); + const auto& partVertices = req.get_parts(); + ifNotExists_ = req.get_if_not_exists(); + CHECK_NOTNULL(env_->schemaMan_); + auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); + if (!ret.ok()) { + LOG(ERROR) << ret.status(); + for (auto& part : partVertices) { + pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); + } + onFinished(); + return; } - onFinished(); - return; - } - spaceVidLen_ = ret.value(); - callingNum_ = partVertices.size(); - - CHECK_NOTNULL(env_->indexMan_); - auto iRet = env_->indexMan_->getTagIndexes(spaceId_); - if (!iRet.ok()) { - LOG(ERROR) << iRet.status(); - for (auto& part : partVertices) { - pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); + spaceVidLen_ = ret.value(); + callingNum_ = partVertices.size(); + + CHECK_NOTNULL(env_->indexMan_); + auto iRet = env_->indexMan_->getTagIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << iRet.status(); + for (auto& part : partVertices) { + pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); + } + onFinished(); + return; } - onFinished(); - return; - } - indexes_ = std::move(iRet).value(); - ignoreExistedIndex_ = req.get_ignore_existed_index(); + indexes_ = std::move(iRet).value(); + ignoreExistedIndex_ = req.get_ignore_existed_index(); - CHECK_NOTNULL(env_->kvstore_); - if (indexes_.empty()) { - doProcess(req); - } else { - doProcessWithIndex(req); + CHECK_NOTNULL(env_->kvstore_); + if (indexes_.empty()) { + doProcess(req); + } else { + doProcessWithIndex(req); + } + } catch (std::bad_alloc& e) { + memoryExceeded_ = true; + onError(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + onError(); + } catch (...) { + onError(); } } diff --git a/src/storage/mutate/DeleteEdgesProcessor.cpp b/src/storage/mutate/DeleteEdgesProcessor.cpp index 41cd9142d48..eacfe721c6b 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.cpp +++ b/src/storage/mutate/DeleteEdgesProcessor.cpp @@ -7,6 +7,7 @@ #include +#include "common/memory/MemoryTracker.h" #include "common/stats/StatsManager.h" #include "common/utils/IndexKeyUtils.h" #include "common/utils/NebulaKeyUtils.h" @@ -19,140 +20,153 @@ namespace storage { ProcessorCounters kDelEdgesCounters; void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { - spaceId_ = req.get_space_id(); - const auto& partEdges = req.get_parts(); - - CHECK_NOTNULL(env_->schemaMan_); - auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); - if (!ret.ok()) { - LOG(ERROR) << ret.status(); - for (auto& part : partEdges) { - pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); + try { + spaceId_ = req.get_space_id(); + const auto& partEdges = req.get_parts(); + + CHECK_NOTNULL(env_->schemaMan_); + auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); + if (!ret.ok()) { + LOG(ERROR) << ret.status(); + for (auto& part : partEdges) { + pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); + } + onFinished(); + return; } - onFinished(); - return; - } - spaceVidLen_ = ret.value(); - callingNum_ = partEdges.size(); - - CHECK_NOTNULL(env_->indexMan_); - auto iRet = env_->indexMan_->getEdgeIndexes(spaceId_); - if (!iRet.ok()) { - LOG(ERROR) << iRet.status(); - for (auto& part : partEdges) { - pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); + spaceVidLen_ = ret.value(); + callingNum_ = partEdges.size(); + + CHECK_NOTNULL(env_->indexMan_); + auto iRet = env_->indexMan_->getEdgeIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << iRet.status(); + for (auto& part : partEdges) { + pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); + } + onFinished(); + return; } - onFinished(); - return; - } - indexes_ = std::move(iRet).value(); - - CHECK_NOTNULL(env_->kvstore_); - if (indexes_.empty()) { - // Operate every part, the graph layer guarantees the unique of the edgeKey - for (auto& part : partEdges) { - std::vector keys; - keys.reserve(32); - auto partId = part.first; - auto code = nebula::cpp2::ErrorCode::SUCCEEDED; - for (auto& edgeKey : part.second) { - if (!NebulaKeyUtils::isValidVidLen( - spaceVidLen_, edgeKey.src_ref()->getStr(), edgeKey.dst_ref()->getStr())) { - LOG(ERROR) << "Space " << spaceId_ << " vertex length invalid, " - << "space vid len: " << spaceVidLen_ << ", edge srcVid: " << *edgeKey.src_ref() - << " dstVid: " << *edgeKey.dst_ref(); - code = nebula::cpp2::ErrorCode::E_INVALID_VID; - break; + indexes_ = std::move(iRet).value(); + + CHECK_NOTNULL(env_->kvstore_); + if (indexes_.empty()) { + // Operate every part, the graph layer guarantees the unique of the edgeKey + for (auto& part : partEdges) { + std::vector keys; + keys.reserve(32); + auto partId = part.first; + auto code = nebula::cpp2::ErrorCode::SUCCEEDED; + for (auto& edgeKey : part.second) { + if (!NebulaKeyUtils::isValidVidLen( + spaceVidLen_, edgeKey.src_ref()->getStr(), edgeKey.dst_ref()->getStr())) { + LOG(ERROR) << "Space " << spaceId_ << " vertex length invalid, " + << "space vid len: " << spaceVidLen_ + << ", edge srcVid: " << *edgeKey.src_ref() + << " dstVid: " << *edgeKey.dst_ref(); + code = nebula::cpp2::ErrorCode::E_INVALID_VID; + break; + } + // todo(doodle): delete lock in toss + auto edge = NebulaKeyUtils::edgeKey(spaceVidLen_, + partId, + edgeKey.src_ref()->getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + edgeKey.dst_ref()->getStr()); + keys.emplace_back(edge.data(), edge.size()); + } + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + handleAsync(spaceId_, partId, code); + continue; } - // todo(doodle): delete lock in toss - auto edge = NebulaKeyUtils::edgeKey(spaceVidLen_, - partId, - edgeKey.src_ref()->getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - edgeKey.dst_ref()->getStr()); - keys.emplace_back(edge.data(), edge.size()); - } - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - handleAsync(spaceId_, partId, code); - continue; - } - HookFuncPara para; - if (tossHookFunc_) { - para.keys.emplace(&keys); - (*tossHookFunc_)(para); - } - if (para.result) { - env_->kvstore_->asyncAppendBatch( - spaceId_, - partId, - std::move(para.result.value()), - [partId, this](nebula::cpp2::ErrorCode rc) { handleAsync(spaceId_, partId, rc); }); - } else { - doRemove(spaceId_, partId, std::move(keys)); - stats::StatsManager::addValue(kNumEdgesDeleted, keys.size()); - } - } - } else { - for (auto& part : partEdges) { - IndexCountWrapper wrapper(env_); - auto partId = part.first; - std::vector dummyLock; - dummyLock.reserve(part.second.size()); - - nebula::cpp2::ErrorCode err = nebula::cpp2::ErrorCode::SUCCEEDED; - for (const auto& edgeKey : part.second) { - if (!NebulaKeyUtils::isValidVidLen( - spaceVidLen_, edgeKey.src_ref()->getStr(), edgeKey.dst_ref()->getStr())) { - LOG(ERROR) << "Space " << spaceId_ << " vertex length invalid, " - << "space vid len: " << spaceVidLen_ << ", edge srcVid: " << *edgeKey.src_ref() - << " dstVid: " << *edgeKey.dst_ref(); - err = nebula::cpp2::ErrorCode::E_INVALID_VID; - break; + HookFuncPara para; + if (tossHookFunc_) { + para.keys.emplace(&keys); + (*tossHookFunc_)(para); + } + if (para.result) { + env_->kvstore_->asyncAppendBatch( + spaceId_, + partId, + std::move(para.result.value()), + [partId, this](nebula::cpp2::ErrorCode rc) { handleAsync(spaceId_, partId, rc); }); + } else { + doRemove(spaceId_, partId, std::move(keys)); + stats::StatsManager::addValue(kNumEdgesDeleted, keys.size()); } - auto l = std::make_tuple(spaceId_, - partId, - edgeKey.src_ref()->getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - edgeKey.dst_ref()->getStr()); - if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { - if (!env_->edgesML_->try_lock(l)) { - LOG(ERROR) << folly::sformat("The edge locked : src {}, type {}, tank {}, dst {}", - edgeKey.src_ref()->getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - edgeKey.dst_ref()->getStr()); - err = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + } + } else { + for (auto& part : partEdges) { + IndexCountWrapper wrapper(env_); + auto partId = part.first; + std::vector dummyLock; + dummyLock.reserve(part.second.size()); + + nebula::cpp2::ErrorCode err = nebula::cpp2::ErrorCode::SUCCEEDED; + for (const auto& edgeKey : part.second) { + if (!NebulaKeyUtils::isValidVidLen( + spaceVidLen_, edgeKey.src_ref()->getStr(), edgeKey.dst_ref()->getStr())) { + LOG(ERROR) << "Space " << spaceId_ << " vertex length invalid, " + << "space vid len: " << spaceVidLen_ + << ", edge srcVid: " << *edgeKey.src_ref() + << " dstVid: " << *edgeKey.dst_ref(); + err = nebula::cpp2::ErrorCode::E_INVALID_VID; break; } - dummyLock.emplace_back(std::move(l)); + auto l = std::make_tuple(spaceId_, + partId, + edgeKey.src_ref()->getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + edgeKey.dst_ref()->getStr()); + if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { + if (!env_->edgesML_->try_lock(l)) { + LOG(ERROR) << folly::sformat("The edge locked : src {}, type {}, tank {}, dst {}", + edgeKey.src_ref()->getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + edgeKey.dst_ref()->getStr()); + err = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + break; + } + dummyLock.emplace_back(std::move(l)); + } } + if (err != nebula::cpp2::ErrorCode::SUCCEEDED) { + env_->edgesML_->unlockBatch(dummyLock); + handleAsync(spaceId_, partId, err); + continue; + } + auto batch = deleteEdges(partId, std::move(part.second)); + if (!nebula::ok(batch)) { + env_->edgesML_->unlockBatch(dummyLock); + handleAsync(spaceId_, partId, nebula::error(batch)); + continue; + } + DCHECK(!nebula::value(batch).empty()); + nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), false, false); + env_->kvstore_->asyncAppendBatch( + spaceId_, + partId, + std::move(nebula::value(batch)), + [l = std::move(lg), icw = std::move(wrapper), partId, this]( + nebula::cpp2::ErrorCode code) { + UNUSED(l); + UNUSED(icw); + handleAsync(spaceId_, partId, code); + }); } - if (err != nebula::cpp2::ErrorCode::SUCCEEDED) { - env_->edgesML_->unlockBatch(dummyLock); - handleAsync(spaceId_, partId, err); - continue; - } - auto batch = deleteEdges(partId, std::move(part.second)); - if (!nebula::ok(batch)) { - env_->edgesML_->unlockBatch(dummyLock); - handleAsync(spaceId_, partId, nebula::error(batch)); - continue; - } - DCHECK(!nebula::value(batch).empty()); - nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), false, false); - env_->kvstore_->asyncAppendBatch(spaceId_, - partId, - std::move(nebula::value(batch)), - [l = std::move(lg), icw = std::move(wrapper), partId, this]( - nebula::cpp2::ErrorCode code) { - UNUSED(l); - UNUSED(icw); - handleAsync(spaceId_, partId, code); - }); } + } catch (std::bad_alloc& e) { + memoryExceeded_ = true; + onError(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + onError(); + } catch (...) { + onError(); } } diff --git a/src/storage/mutate/DeleteTagsProcessor.cpp b/src/storage/mutate/DeleteTagsProcessor.cpp index 7ffc6f28696..9b189fef728 100644 --- a/src/storage/mutate/DeleteTagsProcessor.cpp +++ b/src/storage/mutate/DeleteTagsProcessor.cpp @@ -18,76 +18,87 @@ namespace storage { ProcessorCounters kDelTagsCounters; void DeleteTagsProcessor::process(const cpp2::DeleteTagsRequest& req) { - spaceId_ = req.get_space_id(); - const auto& parts = req.get_parts(); + try { + spaceId_ = req.get_space_id(); + const auto& parts = req.get_parts(); - CHECK_NOTNULL(env_->schemaMan_); - auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); - if (!ret.ok()) { - LOG(ERROR) << ret.status(); - for (auto& part : parts) { - pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); + CHECK_NOTNULL(env_->schemaMan_); + auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); + if (!ret.ok()) { + LOG(ERROR) << ret.status(); + for (auto& part : parts) { + pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); + } + onFinished(); + return; } - onFinished(); - return; - } - spaceVidLen_ = ret.value(); - callingNum_ = parts.size(); + spaceVidLen_ = ret.value(); + callingNum_ = parts.size(); - CHECK_NOTNULL(env_->indexMan_); - auto iRet = env_->indexMan_->getTagIndexes(spaceId_); - if (!iRet.ok()) { - LOG(ERROR) << iRet.status(); - for (auto& part : parts) { - pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); + CHECK_NOTNULL(env_->indexMan_); + auto iRet = env_->indexMan_->getTagIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << iRet.status(); + for (auto& part : parts) { + pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); + } + onFinished(); + return; } - onFinished(); - return; - } - indexes_ = std::move(iRet).value(); + indexes_ = std::move(iRet).value(); - CHECK_NOTNULL(env_->kvstore_); - if (indexes_.empty()) { - std::vector keys; - keys.reserve(32); - for (const auto& part : parts) { - auto partId = part.first; - const auto& delTags = part.second; - keys.clear(); - for (const auto& entry : delTags) { - const auto& vId = entry.get_id().getStr(); - for (const auto& tagId : entry.get_tags()) { - auto key = NebulaKeyUtils::tagKey(spaceVidLen_, partId, vId, tagId); - keys.emplace_back(std::move(key)); + CHECK_NOTNULL(env_->kvstore_); + if (indexes_.empty()) { + std::vector keys; + keys.reserve(32); + for (const auto& part : parts) { + auto partId = part.first; + const auto& delTags = part.second; + keys.clear(); + for (const auto& entry : delTags) { + const auto& vId = entry.get_id().getStr(); + for (const auto& tagId : entry.get_tags()) { + auto key = NebulaKeyUtils::tagKey(spaceVidLen_, partId, vId, tagId); + keys.emplace_back(std::move(key)); + } } + doRemove(spaceId_, partId, std::move(keys)); + stats::StatsManager::addValue(kNumTagsDeleted, keys.size()); } - doRemove(spaceId_, partId, std::move(keys)); - stats::StatsManager::addValue(kNumTagsDeleted, keys.size()); - } - } else { - for (const auto& part : parts) { - IndexCountWrapper wrapper(env_); - auto partId = part.first; - std::vector lockedKeys; - auto batch = deleteTags(partId, part.second, lockedKeys); - if (!nebula::ok(batch)) { - env_->verticesML_->unlockBatch(lockedKeys); - handleAsync(spaceId_, partId, nebula::error(batch)); - continue; + } else { + for (const auto& part : parts) { + IndexCountWrapper wrapper(env_); + auto partId = part.first; + std::vector lockedKeys; + auto batch = deleteTags(partId, part.second, lockedKeys); + if (!nebula::ok(batch)) { + env_->verticesML_->unlockBatch(lockedKeys); + handleAsync(spaceId_, partId, nebula::error(batch)); + continue; + } + // keys has been locked in deleteTags + nebula::MemoryLockGuard lg( + env_->verticesML_.get(), std::move(lockedKeys), false, false); + env_->kvstore_->asyncAppendBatch( + spaceId_, + partId, + std::move(nebula::value(batch)), + [l = std::move(lg), icw = std::move(wrapper), partId, this]( + nebula::cpp2::ErrorCode code) { + UNUSED(l); + UNUSED(icw); + handleAsync(spaceId_, partId, code); + }); } - // keys has been locked in deleteTags - nebula::MemoryLockGuard lg( - env_->verticesML_.get(), std::move(lockedKeys), false, false); - env_->kvstore_->asyncAppendBatch(spaceId_, - partId, - std::move(nebula::value(batch)), - [l = std::move(lg), icw = std::move(wrapper), partId, this]( - nebula::cpp2::ErrorCode code) { - UNUSED(l); - UNUSED(icw); - handleAsync(spaceId_, partId, code); - }); } + } catch (std::bad_alloc& e) { + memoryExceeded_ = true; + onError(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + onError(); + } catch (...) { + onError(); } } diff --git a/src/storage/mutate/DeleteVerticesProcessor.cpp b/src/storage/mutate/DeleteVerticesProcessor.cpp index 63a21f2ad20..3dbc6ae7060 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.cpp +++ b/src/storage/mutate/DeleteVerticesProcessor.cpp @@ -17,95 +17,107 @@ namespace storage { ProcessorCounters kDelVerticesCounters; void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) { - spaceId_ = req.get_space_id(); - const auto& partVertices = req.get_parts(); + try { + spaceId_ = req.get_space_id(); + const auto& partVertices = req.get_parts(); - CHECK_NOTNULL(env_->schemaMan_); - auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); - if (!ret.ok()) { - LOG(ERROR) << ret.status(); - for (auto& part : partVertices) { - pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); + CHECK_NOTNULL(env_->schemaMan_); + auto ret = env_->schemaMan_->getSpaceVidLen(spaceId_); + if (!ret.ok()) { + LOG(ERROR) << ret.status(); + for (auto& part : partVertices) { + pushResultCode(nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN, part.first); + } + onFinished(); + return; } - onFinished(); - return; - } - spaceVidLen_ = ret.value(); - callingNum_ = partVertices.size(); + spaceVidLen_ = ret.value(); + callingNum_ = partVertices.size(); - CHECK_NOTNULL(env_->indexMan_); - auto iRet = env_->indexMan_->getTagIndexes(spaceId_); - if (!iRet.ok()) { - LOG(ERROR) << iRet.status(); - for (auto& part : partVertices) { - pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); + CHECK_NOTNULL(env_->indexMan_); + auto iRet = env_->indexMan_->getTagIndexes(spaceId_); + if (!iRet.ok()) { + LOG(ERROR) << iRet.status(); + for (auto& part : partVertices) { + pushResultCode(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND, part.first); + } + onFinished(); + return; } - onFinished(); - return; - } - indexes_ = std::move(iRet).value(); + indexes_ = std::move(iRet).value(); - CHECK_NOTNULL(env_->kvstore_); - if (indexes_.empty()) { - // Operate every part, the graph layer guarantees the unique of the vid - std::vector keys; - keys.reserve(32); - for (auto& part : partVertices) { - auto partId = part.first; - const auto& vertexIds = part.second; - keys.clear(); - auto code = nebula::cpp2::ErrorCode::SUCCEEDED; - for (auto& vid : vertexIds) { - if (!NebulaKeyUtils::isValidVidLen(spaceVidLen_, vid.getStr())) { - LOG(ERROR) << "Space " << spaceId_ << ", vertex length invalid, " - << " space vid len: " << spaceVidLen_ << ", vid is " << vid; - code = nebula::cpp2::ErrorCode::E_INVALID_VID; - break; + CHECK_NOTNULL(env_->kvstore_); + if (indexes_.empty()) { + // Operate every part, the graph layer guarantees the unique of the vid + std::vector keys; + keys.reserve(32); + for (auto& part : partVertices) { + auto partId = part.first; + const auto& vertexIds = part.second; + keys.clear(); + auto code = nebula::cpp2::ErrorCode::SUCCEEDED; + for (auto& vid : vertexIds) { + if (!NebulaKeyUtils::isValidVidLen(spaceVidLen_, vid.getStr())) { + LOG(ERROR) << "Space " << spaceId_ << ", vertex length invalid, " + << " space vid len: " << spaceVidLen_ << ", vid is " << vid; + code = nebula::cpp2::ErrorCode::E_INVALID_VID; + break; + } + keys.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid.getStr())); + auto prefix = NebulaKeyUtils::tagPrefix(spaceVidLen_, partId, vid.getStr()); + std::unique_ptr iter; + code = env_->kvstore_->prefix(spaceId_, partId, prefix, &iter); + if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { + VLOG(3) << "Error! ret = " << static_cast(code) << ", spaceID " << spaceId_; + break; + } + while (iter->valid()) { + auto key = iter->key(); + keys.emplace_back(key.str()); + iter->next(); + } } - keys.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid.getStr())); - auto prefix = NebulaKeyUtils::tagPrefix(spaceVidLen_, partId, vid.getStr()); - std::unique_ptr iter; - code = env_->kvstore_->prefix(spaceId_, partId, prefix, &iter); if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - VLOG(3) << "Error! ret = " << static_cast(code) << ", spaceID " << spaceId_; - break; - } - while (iter->valid()) { - auto key = iter->key(); - keys.emplace_back(key.str()); - iter->next(); + handleAsync(spaceId_, partId, code); + continue; } + doRemove(spaceId_, partId, std::move(keys)); + stats::StatsManager::addValue(kNumVerticesDeleted, keys.size()); } - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - handleAsync(spaceId_, partId, code); - continue; - } - doRemove(spaceId_, partId, std::move(keys)); - stats::StatsManager::addValue(kNumVerticesDeleted, keys.size()); - } - } else { - for (auto& pv : partVertices) { - IndexCountWrapper wrapper(env_); - auto partId = pv.first; - std::vector dummyLock; - auto batch = deleteVertices(partId, std::move(pv).second, dummyLock); - if (!nebula::ok(batch)) { - env_->verticesML_->unlockBatch(dummyLock); - handleAsync(spaceId_, partId, nebula::error(batch)); - continue; + } else { + for (auto& pv : partVertices) { + IndexCountWrapper wrapper(env_); + auto partId = pv.first; + std::vector dummyLock; + auto batch = deleteVertices(partId, std::move(pv).second, dummyLock); + if (!nebula::ok(batch)) { + env_->verticesML_->unlockBatch(dummyLock); + handleAsync(spaceId_, partId, nebula::error(batch)); + continue; + } + DCHECK(!nebula::value(batch).empty()); + nebula::MemoryLockGuard lg( + env_->verticesML_.get(), std::move(dummyLock), false, false); + env_->kvstore_->asyncAppendBatch( + spaceId_, + partId, + std::move(nebula::value(batch)), + [l = std::move(lg), icw = std::move(wrapper), partId, this]( + nebula::cpp2::ErrorCode code) { + UNUSED(l); + UNUSED(icw); + handleAsync(spaceId_, partId, code); + }); } - DCHECK(!nebula::value(batch).empty()); - nebula::MemoryLockGuard lg(env_->verticesML_.get(), std::move(dummyLock), false, false); - env_->kvstore_->asyncAppendBatch(spaceId_, - partId, - std::move(nebula::value(batch)), - [l = std::move(lg), icw = std::move(wrapper), partId, this]( - nebula::cpp2::ErrorCode code) { - UNUSED(l); - UNUSED(icw); - handleAsync(spaceId_, partId, code); - }); } + } catch (std::bad_alloc& e) { + memoryExceeded_ = true; + onError(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + onError(); + } catch (...) { + onError(); } } diff --git a/src/storage/mutate/UpdateEdgeProcessor.cpp b/src/storage/mutate/UpdateEdgeProcessor.cpp index 0f748075bfe..6e6d1db6d2a 100644 --- a/src/storage/mutate/UpdateEdgeProcessor.cpp +++ b/src/storage/mutate/UpdateEdgeProcessor.cpp @@ -6,6 +6,7 @@ #include "storage/mutate/UpdateEdgeProcessor.h" #include "common/base/Base.h" +#include "common/memory/MemoryTracker.h" #include "common/utils/NebulaKeyUtils.h" #include "storage/exec/EdgeNode.h" #include "storage/exec/FilterNode.h" @@ -18,10 +19,23 @@ namespace storage { ProcessorCounters kUpdateEdgeCounters; void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) { - if (executor_ != nullptr) { - executor_->add([req, this]() { this->doProcess(req); }); - } else { - doProcess(req); + try { + if (executor_ != nullptr) { + executor_->add([req, this]() { + memory::MemoryCheckGuard guard; + this->doProcess(req); + }); + } else { + doProcess(req); + } + } catch (std::bad_alloc& e) { + memoryExceeded_ = true; + onError(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + onError(); + } catch (...) { + onError(); } } diff --git a/src/storage/mutate/UpdateVertexProcessor.cpp b/src/storage/mutate/UpdateVertexProcessor.cpp index f54527ef702..757adb9d6d0 100644 --- a/src/storage/mutate/UpdateVertexProcessor.cpp +++ b/src/storage/mutate/UpdateVertexProcessor.cpp @@ -6,6 +6,7 @@ #include "storage/mutate/UpdateVertexProcessor.h" #include "common/base/Base.h" +#include "common/memory/MemoryTracker.h" #include "common/utils/NebulaKeyUtils.h" #include "storage/exec/FilterNode.h" #include "storage/exec/TagNode.h" @@ -18,10 +19,23 @@ namespace storage { ProcessorCounters kUpdateVertexCounters; void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) { - if (executor_ != nullptr) { - executor_->add([req, this]() { this->doProcess(req); }); - } else { - doProcess(req); + try { + if (executor_ != nullptr) { + executor_->add([req, this]() { + memory::MemoryCheckGuard guard; + this->doProcess(req); + }); + } else { + doProcess(req); + } + } catch (std::bad_alloc& e) { + memoryExceeded_ = true; + onError(); + } catch (std::exception& e) { + LOG(ERROR) << e.what(); + onError(); + } catch (...) { + onError(); } } diff --git a/src/storage/query/GetDstBySrcProcessor.cpp b/src/storage/query/GetDstBySrcProcessor.cpp index 00febc6c18a..11240738108 100644 --- a/src/storage/query/GetDstBySrcProcessor.cpp +++ b/src/storage/query/GetDstBySrcProcessor.cpp @@ -122,6 +122,7 @@ void GetDstBySrcProcessor::runInMultipleThread(const cpp2::GetDstBySrcRequest& r } folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) mutable { + memory::MemoryCheckGuard guard; CHECK(!t.hasException()); const auto& tries = t.value(); @@ -162,6 +163,7 @@ folly::Future> GetDstBySrcProces const std::vector& srcIds) { return folly::via(executor_, [this, context, result, partId, input = std::move(srcIds)]() mutable { + memory::MemoryCheckGuard guard; auto plan = buildPlan(context, result); for (const auto& src : input) { auto& vId = src.getStr(); diff --git a/src/storage/query/GetNeighborsProcessor.cpp b/src/storage/query/GetNeighborsProcessor.cpp index 26bbe291a1f..9e8b5dd30ff 100644 --- a/src/storage/query/GetNeighborsProcessor.cpp +++ b/src/storage/query/GetNeighborsProcessor.cpp @@ -139,6 +139,7 @@ void GetNeighborsProcessor::runInMultipleThread(const cpp2::GetNeighborsRequest& } folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) mutable { + memory::MemoryCheckGuard guard; CHECK(!t.hasException()); const auto& tries = t.value(); size_t sum = 0; @@ -174,6 +175,7 @@ folly::Future> GetNeighborsProce return folly::via( executor_, [this, context, expCtx, result, partId, input = std::move(vids), limit, random]() { + memory::MemoryCheckGuard guard; auto plan = buildPlan(context, expCtx, result, limit, random); for (const auto& vid : input) { auto vId = vid.getStr(); diff --git a/src/storage/query/GetPropProcessor.cpp b/src/storage/query/GetPropProcessor.cpp index 5a05189622a..21f7bda3d3d 100644 --- a/src/storage/query/GetPropProcessor.cpp +++ b/src/storage/query/GetPropProcessor.cpp @@ -138,6 +138,7 @@ void GetPropProcessor::runInMultipleThread(const cpp2::GetPropRequest& req) { } folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) mutable { + memory::MemoryCheckGuard guard; CHECK(!t.hasException()); const auto& tries = t.value(); size_t sum = 0; @@ -169,6 +170,7 @@ folly::Future> GetPropProcessor: const std::vector& rows) { return folly::via(executor_, [this, context, result, partId, input = std::move(rows)]() { + memory::MemoryCheckGuard guard; if (!isEdge_) { auto plan = buildTagPlan(context, result); for (const auto& row : input) { diff --git a/src/storage/query/ScanEdgeProcessor.cpp b/src/storage/query/ScanEdgeProcessor.cpp index dbc5ad1a7f0..717b1924662 100644 --- a/src/storage/query/ScanEdgeProcessor.cpp +++ b/src/storage/query/ScanEdgeProcessor.cpp @@ -136,6 +136,7 @@ folly::Future> ScanEdgeProcessor StorageExpressionContext* expCtx) { return folly::via(executor_, [this, context, result, cursors, partId, input = std::move(cursor), expCtx]() { + memory::MemoryCheckGuard guard; auto plan = buildPlan(context, result, cursors, expCtx); auto ret = plan.go(partId, input); @@ -201,6 +202,7 @@ void ScanEdgeProcessor::runInMultipleThread(const cpp2::ScanEdgeRequest& req) { } folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) mutable { + memory::MemoryCheckGuard guard; CHECK(!t.hasException()); const auto& tries = t.value(); size_t sum = 0; diff --git a/src/storage/query/ScanVertexProcessor.cpp b/src/storage/query/ScanVertexProcessor.cpp index 91b6a0cd7ef..429f514bdb5 100644 --- a/src/storage/query/ScanVertexProcessor.cpp +++ b/src/storage/query/ScanVertexProcessor.cpp @@ -141,6 +141,7 @@ folly::Future> ScanVertexProcess return folly::via( executor_, [this, context, result, cursorsOfPart, partId, input = std::move(cursor), expCtx]() { + memory::MemoryCheckGuard guard; auto plan = buildPlan(context, result, cursorsOfPart, expCtx); auto ret = plan.go(partId, input); @@ -206,6 +207,7 @@ void ScanVertexProcessor::runInMultipleThread(const cpp2::ScanVertexRequest& req } folly::collectAll(futures).via(executor_).thenTry([this](auto&& t) mutable { + memory::MemoryCheckGuard guard; CHECK(!t.hasException()); const auto& tries = t.value(); size_t sum = 0; diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index f71b6664948..27bb6f01921 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -37,6 +37,7 @@ set(storage_test_deps $ $ $ + $ $ $ $ diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index 7aa50d98d51..e30abcd3617 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -37,6 +37,7 @@ set(tools_test_deps $ $ $ + $ $ $ $