Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Mar 28, 2022
1 parent e66c144 commit d8026ef
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,33 +883,41 @@ nebula::cpp2::ErrorCode NebulaStore::ingest(GraphSpaceID spaceId) {
if (!ok(spaceRet)) {
return error(spaceRet);
}

LOG(INFO) << "Ingesting space " << spaceId;
auto space = nebula::value(spaceRet);
std::vector<std::thread> threads;
nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
for (auto& engine : space->engines_) {
auto parts = engine->allParts();
for (auto part : parts) {
auto ret = this->engine(spaceId, part);
if (!ok(ret)) {
return error(ret);
}

auto path = folly::stringPrintf("%s/download/%d", value(ret)->getDataRoot(), part);
if (!fs::FileUtils::exist(path)) {
VLOG(1) << path << " not existed while ingesting";
continue;
}
threads.emplace_back(std::thread([&engine, &code, this, spaceId] {
auto parts = engine->allParts();
for (auto part : parts) {
auto ret = this->engine(spaceId, part);
if (!ok(ret)) {
code = error(ret);
} else {
auto path = folly::stringPrintf("%s/download/%d", value(ret)->getDataRoot(), part);
if (!fs::FileUtils::exist(path)) {
LOG(INFO) << path << " not existed";
continue;
}

auto files = nebula::fs::FileUtils::listAllFilesInDir(path.c_str(), true, "*.sst");
for (auto file : files) {
VLOG(1) << "Ingesting extra file: " << file;
auto code = engine->ingest(std::vector<std::string>({file}));
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
return code;
auto files = nebula::fs::FileUtils::listAllFilesInDir(path.c_str(), true, "*.sst");
auto result = engine->ingest(std::vector<std::string>(files));
if (result != nebula::cpp2::ErrorCode::SUCCEEDED) {
code = result;
}
}
}
}
}));
}
return nebula::cpp2::ErrorCode::SUCCEEDED;

// Wait for all threads to finish
for (auto& t : threads) {
t.join();
}
LOG(INFO) << "Space " << spaceId << " ingest done.";
return code;
}

nebula::cpp2::ErrorCode NebulaStore::setOption(GraphSpaceID spaceId,
Expand Down

0 comments on commit d8026ef

Please sign in to comment.