From d8026efc0c0095af2715147fd300e8a43892acc9 Mon Sep 17 00:00:00 2001 From: "darion.yaphet" Date: Tue, 29 Mar 2022 00:27:25 +0800 Subject: [PATCH] fix comment --- src/kvstore/NebulaStore.cpp | 48 +++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index e087beae4a4..2c551b497c8 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -883,33 +883,41 @@ nebula::cpp2::ErrorCode NebulaStore::ingest(GraphSpaceID spaceId) { if (!ok(spaceRet)) { return error(spaceRet); } + LOG(INFO) << "Ingesting space " << spaceId; auto space = nebula::value(spaceRet); + std::vector threads; + nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED; for (auto& engine : space->engines_) { - auto parts = engine->allParts(); - for (auto part : parts) { - auto ret = this->engine(spaceId, part); - if (!ok(ret)) { - return error(ret); - } - - auto path = folly::stringPrintf("%s/download/%d", value(ret)->getDataRoot(), part); - if (!fs::FileUtils::exist(path)) { - VLOG(1) << path << " not existed while ingesting"; - continue; - } + threads.emplace_back(std::thread([&engine, &code, this, spaceId] { + auto parts = engine->allParts(); + for (auto part : parts) { + auto ret = this->engine(spaceId, part); + if (!ok(ret)) { + code = error(ret); + } else { + auto path = folly::stringPrintf("%s/download/%d", value(ret)->getDataRoot(), part); + if (!fs::FileUtils::exist(path)) { + LOG(INFO) << path << " not existed"; + continue; + } - auto files = nebula::fs::FileUtils::listAllFilesInDir(path.c_str(), true, "*.sst"); - for (auto file : files) { - VLOG(1) << "Ingesting extra file: " << file; - auto code = engine->ingest(std::vector({file})); - if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { - return code; + auto files = nebula::fs::FileUtils::listAllFilesInDir(path.c_str(), true, "*.sst"); + auto result = engine->ingest(std::vector(files)); + if (result != nebula::cpp2::ErrorCode::SUCCEEDED) { + code = result; + } } } - } + })); } - return nebula::cpp2::ErrorCode::SUCCEEDED; + + // Wait for all threads to finish + for (auto& t : threads) { + t.join(); + } + LOG(INFO) << "Space " << spaceId << " ingest done."; + return code; } nebula::cpp2::ErrorCode NebulaStore::setOption(GraphSpaceID spaceId,