diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index e3e94f4eeb8..74f4c3bfd73 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -9,6 +9,7 @@ #include #include #include "network/NetworkUtils.h" +#include "fs/FileUtils.h" #include "kvstore/RocksdbEngine.h" DEFINE_string(engine_type, "rocksdb", "rocksdb, memory..."); @@ -18,6 +19,7 @@ DEFINE_string(part_type, "simple", "simple, consensus..."); * Check spaceId, partId exists or not. * */ #define CHECK_FOR_WRITE(spaceId, partId, cb) \ + folly::RWSpinLock::ReadHolder rh(&lock_); \ auto it = kvs_.find(spaceId); \ if (UNLIKELY(it == kvs_.end())) { \ cb(ResultCode::ERR_SPACE_NOT_FOUND, HostAddr(0, 0)); \ @@ -35,6 +37,7 @@ DEFINE_string(part_type, "simple", "simple, consensus..."); * Check spaceId, partId and return related storage engine. * */ #define CHECK_AND_RETURN_ENGINE(spaceId, partId) \ + folly::RWSpinLock::ReadHolder rh(&lock_); \ KVEngine* engine = nullptr; \ do { \ auto it = kvs_.find(spaceId); \ @@ -61,52 +64,118 @@ KVStore* KVStore::instance(KVOptions options) { return instance; } - -std::vector NebulaStore::initEngines(GraphSpaceID spaceId) { - decltype(kvs_[spaceId]->engines_) engines; - for (auto& path : options_.dataPaths_) { - if (FLAGS_engine_type == "rocksdb") { - engines.emplace_back( - new RocksdbEngine(spaceId, - folly::stringPrintf("%s/nebula/%d/data", - path.c_str(), spaceId), - options_.mergeOp_, - options_.cfFactory_), - path); - } else { - LOG(FATAL) << "Unknown engine type " << FLAGS_engine_type; - } +Engine NebulaStore::newEngine(GraphSpaceID spaceId, std::string rootPath) { + if (FLAGS_engine_type == "rocksdb") { + auto dataPath = folly::stringPrintf("%s/nebula/%d/data", rootPath.c_str(), spaceId); + auto engine = std::make_pair( + std::unique_ptr( + new RocksdbEngine( + spaceId, + std::move(dataPath), + options_.mergeOp_, + options_.cfFactory_)), + std::move(rootPath)); + return engine; + } else { + LOG(FATAL) << "Unknown Part type " << FLAGS_part_type; } - return engines; } +std::unique_ptr NebulaStore::newPart(GraphSpaceID spaceId, + PartitionID partId, + const Engine& engine) { + if (FLAGS_part_type == "simple") { + return std::unique_ptr(new SimplePart( + spaceId, + partId, + folly::stringPrintf("%s/nebula/%d/wals/%d", + engine.second.c_str(), spaceId, partId), + engine.first.get())); + } else { + LOG(FATAL) << "Unknown Part type " << FLAGS_part_type; + } +} -PartEngine NebulaStore::checkLocalParts(GraphSpaceID spaceId) { - PartEngine maps; - for (auto& engine : this->kvs_[spaceId]->engines_) { - auto parts = engine.first->allParts(); - for (auto partId : parts) { - if (partMan_->partExist(spaceId, partId)) { - maps.emplace(partId, &engine); - } else { - engine.first->removePart(partId); +void NebulaStore::init() { + CHECK(!!partMan_); + LOG(INFO) << "Scan the local path, and init the kvs_"; + { + folly::RWSpinLock::WriteHolder wh(&lock_); + for (auto& path : options_.dataPaths_) { + auto rootPath = folly::stringPrintf("%s/nebula", path.c_str()); + auto dirs = fs::FileUtils::listAllDirsInDir(rootPath.c_str()); + for (auto& dir : dirs) { + LOG(INFO) << "Scan path " << path << "/" << dir; + try { + auto spaceId = folly::to(dir); + if (!partMan_->spaceExist(spaceId)) { + LOG(INFO) << "Space " << spaceId << " not exist any more, remove the data!"; + auto dataPath = folly::stringPrintf("%s/%s", rootPath.c_str(), dir.c_str()); + CHECK(fs::FileUtils::remove(dataPath.c_str(), true)); + continue; + } + auto engine = newEngine(spaceId, path); + auto spaceIt = this->kvs_.find(spaceId); + if (spaceIt == this->kvs_.end()) { + LOG(INFO) << "Load space " << spaceId << " from disk"; + this->kvs_.emplace(spaceId, std::make_unique()); + } + auto& spaceKV = this->kvs_[spaceId]; + for (auto& partId : engine.first->allParts()) { + if (!partMan_->partExist(spaceId, partId)) { + LOG(INFO) << "Part " << partId << " not exist any more, remove it!"; + engine.first->removePart(partId); + continue; + } + spaceKV->parts_.emplace(partId, newPart(spaceId, partId, engine)); + LOG(INFO) << "Space " << spaceId + << ", part " << partId << " has been loaded!"; + } + spaceKV->engines_.emplace_back(std::move(engine)); + } catch (std::exception& e) { + LOG(FATAL) << "Can't convert " << dir; + } } } } - return maps; + LOG(INFO) << "Init data from partManager..."; + auto partsMap = partMan_->parts(options_.local_); + for (auto& entry : partsMap) { + auto spaceId = entry.first; + addSpace(spaceId); + for (auto& partEntry : entry.second) { + addPart(spaceId, partEntry.first); + } + } + LOG(INFO) << "Register handler..."; + partMan_->registerHandler(this); } +void NebulaStore::addSpace(GraphSpaceID spaceId) { + folly::RWSpinLock::WriteHolder wh(&lock_); + if (this->kvs_.find(spaceId) != this->kvs_.end()) { + LOG(INFO) << "Space " << spaceId << " has existed!"; + return; + } + LOG(INFO) << "Create space " << spaceId; + this->kvs_[spaceId] = std::make_unique(); + for (auto& path : options_.dataPaths_) { + this->kvs_[spaceId]->engines_.emplace_back(newEngine(spaceId, path)); + } + return; +} -const Engine& NebulaStore::dispatchPart(GraphSpaceID spaceId, - PartitionID partId, - const PartEngine& maps) { - auto it = maps.find(partId); - if (it != maps.end()) { - return *it->second; +void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId) { + folly::RWSpinLock::WriteHolder wh(&lock_); + auto spaceIt = this->kvs_.find(spaceId); + CHECK(spaceIt != this->kvs_.end()) << "Space should exist!"; + if (spaceIt->second->parts_.find(partId) != spaceIt->second->parts_.end()) { + LOG(INFO) << "[" << spaceId << "," << partId << "] has existed!"; + return; } int32_t minIndex = -1, index = 0; int32_t minPartsNum = 0x7FFFFFFF; - auto& engines = this->kvs_[spaceId]->engines_; + auto& engines = spaceIt->second->engines_; for (auto& engine : engines) { if (engine.first->totalPartsNum() < minPartsNum) { minPartsNum = engine.first->totalPartsNum(); @@ -115,47 +184,15 @@ const Engine& NebulaStore::dispatchPart(GraphSpaceID spaceId, index++; } CHECK_GE(minIndex, 0) << "engines number:" << engines.size(); - const auto& target = engines[minIndex]; + const auto& targetEngine = engines[minIndex]; // Write the information into related engine. - target.first->addPart(partId); - return target; + targetEngine.first->addPart(partId); + spaceIt->second->parts_.emplace(partId, + newPart(spaceId, partId, targetEngine)); + LOG(INFO) << "Space " << spaceId << ", part " << partId << " has been added!"; + return; } - -void NebulaStore::init() { - auto partsMap = partMan_->parts(options_.local_); - LOG(INFO) << "Init all parts, total graph space " << partsMap.size(); - std::for_each(partsMap.begin(), partsMap.end(), [this](auto& idPart) { - auto spaceId = idPart.first; - auto& spaceParts = idPart.second; - - this->kvs_[spaceId] = std::make_unique(); - this->kvs_[spaceId]->engines_ = initEngines(spaceId); - - auto partEngineMap = checkLocalParts(spaceId); - // Init kvs[spaceId]->parts - decltype(this->kvs_[spaceId]->parts_) parts; - std::for_each(spaceParts.begin(), spaceParts.end(), [&](auto& partItem) { - auto partId = partItem.first; - auto& engine = dispatchPart(spaceId, partId, partEngineMap); - auto& enginePtr = engine.first; - auto& path = engine.second; - if (FLAGS_part_type == "simple") { - parts.emplace(partId, new SimplePart( - spaceId, - partId, - folly::stringPrintf("%s/nebula/%d/wals/%d", - path.c_str(), spaceId, partId), - enginePtr.get())); - } else { - LOG(FATAL) << "Unknown Part type " << FLAGS_part_type; - } - }); - this->kvs_[spaceId]->parts_ = std::move(parts); - }); -} - - ResultCode NebulaStore::get(GraphSpaceID spaceId, PartitionID partId, const std::string& key, std::string* value) { @@ -163,7 +200,6 @@ ResultCode NebulaStore::get(GraphSpaceID spaceId, PartitionID partId, return engine->get(key, value); } - ResultCode NebulaStore::range(GraphSpaceID spaceId, PartitionID partId, const std::string& start, const std::string& end, @@ -172,7 +208,6 @@ ResultCode NebulaStore::range(GraphSpaceID spaceId, PartitionID partId, return engine->range(start, end, iter); } - ResultCode NebulaStore::prefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix, std::unique_ptr* iter) { @@ -180,7 +215,6 @@ ResultCode NebulaStore::prefix(GraphSpaceID spaceId, PartitionID partId, return engine->prefix(prefix, iter); } - void NebulaStore::asyncMultiPut(GraphSpaceID spaceId, PartitionID partId, std::vector keyValues, KVCallback cb) { @@ -188,7 +222,6 @@ void NebulaStore::asyncMultiPut(GraphSpaceID spaceId, PartitionID partId, return partIt->second->asyncMultiPut(std::move(keyValues), std::move(cb)); } - void NebulaStore::asyncRemove(GraphSpaceID spaceId, PartitionID partId, const std::string& key, @@ -197,7 +230,6 @@ void NebulaStore::asyncRemove(GraphSpaceID spaceId, return partIt->second->asyncRemove(key, std::move(cb)); } - void NebulaStore::asyncRemoveRange(GraphSpaceID spaceId, PartitionID partId, const std::string& start, diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index e8d1e96c5c1..f2b8f3f5e34 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -8,6 +8,7 @@ #define KVSTORE_NEBULASTORE_H_ #include +#include #include "base/Base.h" #include "kvstore/KVStore.h" #include "kvstore/PartManager.h" @@ -27,7 +28,7 @@ struct GraphSpaceKV { std::vector engines_; }; -class NebulaStore : public KVStore { +class NebulaStore : public KVStore, public Handler { FRIEND_TEST(KVStoreTest, SimpleTest); FRIEND_TEST(KVStoreTest, PartsTest); @@ -90,25 +91,23 @@ class NebulaStore : public KVStore { private: /** - * Init engines for one space. + * Implement two interfaces in Handler. * */ - std::vector initEngines(GraphSpaceID spaceId); + void addSpace(GraphSpaceID spaceId) override; - /** - * Check whether parts stored in local existed in PartMan, if not, remove it locally. - * Return partEngine map. - * */ - PartEngine checkLocalParts(GraphSpaceID spaceId); + void addPart(GraphSpaceID spaceId, PartitionID partId) override; - /** - * Dispatch part to some engine, return the engine while would hold the part. - * */ - const Engine& dispatchPart(GraphSpaceID spaceId, - PartitionID partId, - const PartEngine& maps); +private: + Engine newEngine(GraphSpaceID spaceId, std::string rootPath); + + std::unique_ptr newPart(GraphSpaceID spaceId, + PartitionID partId, + const Engine& engine); private: std::unordered_map> kvs_; + // The lock used to protect kvs_ + folly::RWSpinLock lock_; PartManager* partMan_ = nullptr; KVOptions options_; }; diff --git a/src/kvstore/PartManager.h b/src/kvstore/PartManager.h index e672f8f03b4..1b9e44bf540 100644 --- a/src/kvstore/PartManager.h +++ b/src/kvstore/PartManager.h @@ -22,6 +22,13 @@ struct PartMeta { }; using PartsMap = std::unordered_map>; + +class Handler { +public: + virtual void addSpace(GraphSpaceID spaceId) = 0; + virtual void addPart(GraphSpaceID spaceId, PartitionID partId) = 0; +}; + /** * This class manages all meta information one storage host needed. * */ @@ -49,9 +56,22 @@ class PartManager { * */ virtual bool partExist(GraphSpaceID spaceId, PartitionID partId) = 0; + /** + * Check current space exist or not. + * */ + virtual bool spaceExist(GraphSpaceID spaceId) = 0; + + /** + * Register Handler + * */ + void registerHandler(Handler* handler) { + handler_ = handler; + } + protected: PartManager() = default; static PartManager* instance_; + Handler* handler_ = nullptr; }; /** @@ -71,7 +91,18 @@ class MemPartManager final : public PartManager { PartMeta partMeta(GraphSpaceID spaceId, PartitionID partId) override; void addPart(GraphSpaceID spaceId, PartitionID partId) { - partsMap_[spaceId][partId] = PartMeta(); + if (partsMap_.find(spaceId) == partsMap_.end()) { + if (handler_) { + handler_->addSpace(spaceId); + } + } + auto& p = partsMap_[spaceId]; + if (p.find(partId) == p.end()) { + if (handler_) { + handler_->addPart(spaceId, partId); + } + } + p[partId] = PartMeta(); } bool partExist(GraphSpaceID spaceId, PartitionID partId) override { @@ -85,6 +116,15 @@ class MemPartManager final : public PartManager { return false; } + bool spaceExist(GraphSpaceID spaceId) override { + return partsMap_.find(spaceId) != partsMap_.end(); + } + + void clear() { + partsMap_.clear(); + handler_ = nullptr; + } + PartsMap& partsMap() { return partsMap_; } diff --git a/src/kvstore/test/KVStoreTest.cpp b/src/kvstore/test/KVStoreTest.cpp index 6b3396eacdf..322591d65c6 100644 --- a/src/kvstore/test/KVStoreTest.cpp +++ b/src/kvstore/test/KVStoreTest.cpp @@ -33,6 +33,7 @@ TEST(KVStoreTest, SimpleTest) { FLAGS_part_man_type = "memory"; // Use MemPartManager. fs::TempDir rootPath("/tmp/kvstore_test.XXXXXX"); MemPartManager* partMan = reinterpret_cast(PartManager::instance()); + partMan->clear(); // GraphSpaceID => {PartitionIDs} // 1 => {0, 1, 2, 3, 4, 5} // 2 => {0, 1, 2, 3, 4, 5} @@ -111,11 +112,12 @@ TEST(KVStoreTest, PartsTest) { FLAGS_part_man_type = "memory"; // Use MemPartManager. fs::TempDir rootPath("/tmp/kvstore_test.XXXXXX"); MemPartManager* partMan = reinterpret_cast(PartManager::instance()); + partMan->clear(); // GraphSpaceID => {PartitionIDs} // 0 => {0, 1, 2, 3...9} // The parts on PartMan is 0...9 for (auto partId = 0; partId < 10; partId++) { - partMan->partsMap_[0][partId] = PartMeta(); + partMan->addPart(0, partId); } std::vector paths; paths.push_back(folly::stringPrintf("%s/disk1", rootPath.path())); @@ -140,24 +142,25 @@ TEST(KVStoreTest, PartsTest) { options.dataPaths_ = std::move(paths); kv.reset(static_cast(KVStore::instance(std::move(options)))); - // After init, the parts should be 0-9, and the distribution should be - // disk1: 0, 1, 2, x, y - // disk2: 5, 6, 7, x1, y1 - // x, y, x1, y1 in {3, 4, 8, 9} - for (auto i = 0; i < 2; i++) { - ASSERT_EQ(folly::stringPrintf("%s/disk%d", rootPath.path(), i + 1), - kv->kvs_[0]->engines_[i].second); - } - { - auto parts = kv->kvs_[0]->engines_[0].first->allParts(); - dump(parts); - ASSERT_EQ(5, parts.size()); - } - { - auto parts = kv->kvs_[0]->engines_[1].first->allParts(); - dump(parts); - ASSERT_EQ(5, parts.size()); + auto check = [&](GraphSpaceID spaceId) { + // After init, the parts should be 0-9, and the distribution should be + // disk1: 0, 1, 2, x, y + // disk2: 5, 6, 7, x1, y1 + // x, y, x1, y1 in {3, 4, 8, 9} + for (auto i = 0; i < 2; i++) { + ASSERT_EQ(folly::stringPrintf("%s/disk%d", rootPath.path(), i + 1), + kv->kvs_[spaceId]->engines_[i].second); + auto parts = kv->kvs_[spaceId]->engines_[i].first->allParts(); + dump(parts); + ASSERT_EQ(5, parts.size()); + } + }; + check(0); + // Let's create another space with 10 parts. + for (auto partId = 0; partId < 10; partId++) { + partMan->addPart(1, partId); } + check(1); } } // namespace kvstore