Skip to content

Commit

Permalink
Support create space/add part dynamically in kvstore (vesoft-inc#197)
Browse files Browse the repository at this point in the history
* Support create space/add part dynamically in kvstore

* Address dutor's and darion's comments
  • Loading branch information
dangleptr authored Mar 8, 2019
1 parent be8afeb commit 7bdd86e
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 109 deletions.
184 changes: 108 additions & 76 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <algorithm>
#include <cstdint>
#include "network/NetworkUtils.h"
#include "fs/FileUtils.h"
#include "kvstore/RocksdbEngine.h"

DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
Expand All @@ -18,6 +19,7 @@ DEFINE_string(part_type, "simple", "simple, consensus...");
* Check spaceId, partId exists or not.
* */
#define CHECK_FOR_WRITE(spaceId, partId, cb) \
folly::RWSpinLock::ReadHolder rh(&lock_); \
auto it = kvs_.find(spaceId); \
if (UNLIKELY(it == kvs_.end())) { \
cb(ResultCode::ERR_SPACE_NOT_FOUND, HostAddr(0, 0)); \
Expand All @@ -35,6 +37,7 @@ DEFINE_string(part_type, "simple", "simple, consensus...");
* Check spaceId, partId and return related storage engine.
* */
#define CHECK_AND_RETURN_ENGINE(spaceId, partId) \
folly::RWSpinLock::ReadHolder rh(&lock_); \
KVEngine* engine = nullptr; \
do { \
auto it = kvs_.find(spaceId); \
Expand All @@ -61,52 +64,118 @@ KVStore* KVStore::instance(KVOptions options) {
return instance;
}


std::vector<Engine> NebulaStore::initEngines(GraphSpaceID spaceId) {
decltype(kvs_[spaceId]->engines_) engines;
for (auto& path : options_.dataPaths_) {
if (FLAGS_engine_type == "rocksdb") {
engines.emplace_back(
new RocksdbEngine(spaceId,
folly::stringPrintf("%s/nebula/%d/data",
path.c_str(), spaceId),
options_.mergeOp_,
options_.cfFactory_),
path);
} else {
LOG(FATAL) << "Unknown engine type " << FLAGS_engine_type;
}
Engine NebulaStore::newEngine(GraphSpaceID spaceId, std::string rootPath) {
if (FLAGS_engine_type == "rocksdb") {
auto dataPath = folly::stringPrintf("%s/nebula/%d/data", rootPath.c_str(), spaceId);
auto engine = std::make_pair(
std::unique_ptr<KVEngine>(
new RocksdbEngine(
spaceId,
std::move(dataPath),
options_.mergeOp_,
options_.cfFactory_)),
std::move(rootPath));
return engine;
} else {
LOG(FATAL) << "Unknown Part type " << FLAGS_part_type;
}
return engines;
}

std::unique_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
PartitionID partId,
const Engine& engine) {
if (FLAGS_part_type == "simple") {
return std::unique_ptr<Part>(new SimplePart(
spaceId,
partId,
folly::stringPrintf("%s/nebula/%d/wals/%d",
engine.second.c_str(), spaceId, partId),
engine.first.get()));
} else {
LOG(FATAL) << "Unknown Part type " << FLAGS_part_type;
}
}

PartEngine NebulaStore::checkLocalParts(GraphSpaceID spaceId) {
PartEngine maps;
for (auto& engine : this->kvs_[spaceId]->engines_) {
auto parts = engine.first->allParts();
for (auto partId : parts) {
if (partMan_->partExist(spaceId, partId)) {
maps.emplace(partId, &engine);
} else {
engine.first->removePart(partId);
void NebulaStore::init() {
CHECK(!!partMan_);
LOG(INFO) << "Scan the local path, and init the kvs_";
{
folly::RWSpinLock::WriteHolder wh(&lock_);
for (auto& path : options_.dataPaths_) {
auto rootPath = folly::stringPrintf("%s/nebula", path.c_str());
auto dirs = fs::FileUtils::listAllDirsInDir(rootPath.c_str());
for (auto& dir : dirs) {
LOG(INFO) << "Scan path " << path << "/" << dir;
try {
auto spaceId = folly::to<GraphSpaceID>(dir);
if (!partMan_->spaceExist(spaceId)) {
LOG(INFO) << "Space " << spaceId << " not exist any more, remove the data!";
auto dataPath = folly::stringPrintf("%s/%s", rootPath.c_str(), dir.c_str());
CHECK(fs::FileUtils::remove(dataPath.c_str(), true));
continue;
}
auto engine = newEngine(spaceId, path);
auto spaceIt = this->kvs_.find(spaceId);
if (spaceIt == this->kvs_.end()) {
LOG(INFO) << "Load space " << spaceId << " from disk";
this->kvs_.emplace(spaceId, std::make_unique<GraphSpaceKV>());
}
auto& spaceKV = this->kvs_[spaceId];
for (auto& partId : engine.first->allParts()) {
if (!partMan_->partExist(spaceId, partId)) {
LOG(INFO) << "Part " << partId << " not exist any more, remove it!";
engine.first->removePart(partId);
continue;
}
spaceKV->parts_.emplace(partId, newPart(spaceId, partId, engine));
LOG(INFO) << "Space " << spaceId
<< ", part " << partId << " has been loaded!";
}
spaceKV->engines_.emplace_back(std::move(engine));
} catch (std::exception& e) {
LOG(FATAL) << "Can't convert " << dir;
}
}
}
}
return maps;
LOG(INFO) << "Init data from partManager...";
auto partsMap = partMan_->parts(options_.local_);
for (auto& entry : partsMap) {
auto spaceId = entry.first;
addSpace(spaceId);
for (auto& partEntry : entry.second) {
addPart(spaceId, partEntry.first);
}
}
LOG(INFO) << "Register handler...";
partMan_->registerHandler(this);
}

void NebulaStore::addSpace(GraphSpaceID spaceId) {
folly::RWSpinLock::WriteHolder wh(&lock_);
if (this->kvs_.find(spaceId) != this->kvs_.end()) {
LOG(INFO) << "Space " << spaceId << " has existed!";
return;
}
LOG(INFO) << "Create space " << spaceId;
this->kvs_[spaceId] = std::make_unique<GraphSpaceKV>();
for (auto& path : options_.dataPaths_) {
this->kvs_[spaceId]->engines_.emplace_back(newEngine(spaceId, path));
}
return;
}

const Engine& NebulaStore::dispatchPart(GraphSpaceID spaceId,
PartitionID partId,
const PartEngine& maps) {
auto it = maps.find(partId);
if (it != maps.end()) {
return *it->second;
void NebulaStore::addPart(GraphSpaceID spaceId, PartitionID partId) {
folly::RWSpinLock::WriteHolder wh(&lock_);
auto spaceIt = this->kvs_.find(spaceId);
CHECK(spaceIt != this->kvs_.end()) << "Space should exist!";
if (spaceIt->second->parts_.find(partId) != spaceIt->second->parts_.end()) {
LOG(INFO) << "[" << spaceId << "," << partId << "] has existed!";
return;
}
int32_t minIndex = -1, index = 0;
int32_t minPartsNum = 0x7FFFFFFF;
auto& engines = this->kvs_[spaceId]->engines_;
auto& engines = spaceIt->second->engines_;
for (auto& engine : engines) {
if (engine.first->totalPartsNum() < minPartsNum) {
minPartsNum = engine.first->totalPartsNum();
Expand All @@ -115,55 +184,22 @@ const Engine& NebulaStore::dispatchPart(GraphSpaceID spaceId,
index++;
}
CHECK_GE(minIndex, 0) << "engines number:" << engines.size();
const auto& target = engines[minIndex];
const auto& targetEngine = engines[minIndex];
// Write the information into related engine.
target.first->addPart(partId);
return target;
targetEngine.first->addPart(partId);
spaceIt->second->parts_.emplace(partId,
newPart(spaceId, partId, targetEngine));
LOG(INFO) << "Space " << spaceId << ", part " << partId << " has been added!";
return;
}


void NebulaStore::init() {
auto partsMap = partMan_->parts(options_.local_);
LOG(INFO) << "Init all parts, total graph space " << partsMap.size();
std::for_each(partsMap.begin(), partsMap.end(), [this](auto& idPart) {
auto spaceId = idPart.first;
auto& spaceParts = idPart.second;

this->kvs_[spaceId] = std::make_unique<GraphSpaceKV>();
this->kvs_[spaceId]->engines_ = initEngines(spaceId);

auto partEngineMap = checkLocalParts(spaceId);
// Init kvs[spaceId]->parts
decltype(this->kvs_[spaceId]->parts_) parts;
std::for_each(spaceParts.begin(), spaceParts.end(), [&](auto& partItem) {
auto partId = partItem.first;
auto& engine = dispatchPart(spaceId, partId, partEngineMap);
auto& enginePtr = engine.first;
auto& path = engine.second;
if (FLAGS_part_type == "simple") {
parts.emplace(partId, new SimplePart(
spaceId,
partId,
folly::stringPrintf("%s/nebula/%d/wals/%d",
path.c_str(), spaceId, partId),
enginePtr.get()));
} else {
LOG(FATAL) << "Unknown Part type " << FLAGS_part_type;
}
});
this->kvs_[spaceId]->parts_ = std::move(parts);
});
}


ResultCode NebulaStore::get(GraphSpaceID spaceId, PartitionID partId,
const std::string& key,
std::string* value) {
CHECK_AND_RETURN_ENGINE(spaceId, partId);
return engine->get(key, value);
}


ResultCode NebulaStore::range(GraphSpaceID spaceId, PartitionID partId,
const std::string& start,
const std::string& end,
Expand All @@ -172,23 +208,20 @@ ResultCode NebulaStore::range(GraphSpaceID spaceId, PartitionID partId,
return engine->range(start, end, iter);
}


ResultCode NebulaStore::prefix(GraphSpaceID spaceId, PartitionID partId,
const std::string& prefix,
std::unique_ptr<KVIterator>* iter) {
CHECK_AND_RETURN_ENGINE(spaceId, partId);
return engine->prefix(prefix, iter);
}


void NebulaStore::asyncMultiPut(GraphSpaceID spaceId, PartitionID partId,
std::vector<KV> keyValues,
KVCallback cb) {
CHECK_FOR_WRITE(spaceId, partId, cb);
return partIt->second->asyncMultiPut(std::move(keyValues), std::move(cb));
}


void NebulaStore::asyncRemove(GraphSpaceID spaceId,
PartitionID partId,
const std::string& key,
Expand All @@ -197,7 +230,6 @@ void NebulaStore::asyncRemove(GraphSpaceID spaceId,
return partIt->second->asyncRemove(key, std::move(cb));
}


void NebulaStore::asyncRemoveRange(GraphSpaceID spaceId,
PartitionID partId,
const std::string& start,
Expand Down
27 changes: 13 additions & 14 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define KVSTORE_NEBULASTORE_H_

#include <gtest/gtest_prod.h>
#include <folly/RWSpinLock.h>
#include "base/Base.h"
#include "kvstore/KVStore.h"
#include "kvstore/PartManager.h"
Expand All @@ -27,7 +28,7 @@ struct GraphSpaceKV {
std::vector<Engine> engines_;
};

class NebulaStore : public KVStore {
class NebulaStore : public KVStore, public Handler {
FRIEND_TEST(KVStoreTest, SimpleTest);
FRIEND_TEST(KVStoreTest, PartsTest);

Expand Down Expand Up @@ -90,25 +91,23 @@ class NebulaStore : public KVStore {

private:
/**
* Init engines for one space.
* Implement two interfaces in Handler.
* */
std::vector<Engine> initEngines(GraphSpaceID spaceId);
void addSpace(GraphSpaceID spaceId) override;

/**
* Check whether parts stored in local existed in PartMan, if not, remove it locally.
* Return partEngine map.
* */
PartEngine checkLocalParts(GraphSpaceID spaceId);
void addPart(GraphSpaceID spaceId, PartitionID partId) override;

/**
* Dispatch part to some engine, return the engine while would hold the part.
* */
const Engine& dispatchPart(GraphSpaceID spaceId,
PartitionID partId,
const PartEngine& maps);
private:
Engine newEngine(GraphSpaceID spaceId, std::string rootPath);

std::unique_ptr<Part> newPart(GraphSpaceID spaceId,
PartitionID partId,
const Engine& engine);

private:
std::unordered_map<GraphSpaceID, std::unique_ptr<GraphSpaceKV>> kvs_;
// The lock used to protect kvs_
folly::RWSpinLock lock_;
PartManager* partMan_ = nullptr;
KVOptions options_;
};
Expand Down
42 changes: 41 additions & 1 deletion src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ struct PartMeta {
};

using PartsMap = std::unordered_map<GraphSpaceID, std::unordered_map<PartitionID, PartMeta>>;

class Handler {
public:
virtual void addSpace(GraphSpaceID spaceId) = 0;
virtual void addPart(GraphSpaceID spaceId, PartitionID partId) = 0;
};

/**
* This class manages all meta information one storage host needed.
* */
Expand Down Expand Up @@ -49,9 +56,22 @@ class PartManager {
* */
virtual bool partExist(GraphSpaceID spaceId, PartitionID partId) = 0;

/**
* Check current space exist or not.
* */
virtual bool spaceExist(GraphSpaceID spaceId) = 0;

/**
* Register Handler
* */
void registerHandler(Handler* handler) {
handler_ = handler;
}

protected:
PartManager() = default;
static PartManager* instance_;
Handler* handler_ = nullptr;
};

/**
Expand All @@ -71,7 +91,18 @@ class MemPartManager final : public PartManager {
PartMeta partMeta(GraphSpaceID spaceId, PartitionID partId) override;

void addPart(GraphSpaceID spaceId, PartitionID partId) {
partsMap_[spaceId][partId] = PartMeta();
if (partsMap_.find(spaceId) == partsMap_.end()) {
if (handler_) {
handler_->addSpace(spaceId);
}
}
auto& p = partsMap_[spaceId];
if (p.find(partId) == p.end()) {
if (handler_) {
handler_->addPart(spaceId, partId);
}
}
p[partId] = PartMeta();
}

bool partExist(GraphSpaceID spaceId, PartitionID partId) override {
Expand All @@ -85,6 +116,15 @@ class MemPartManager final : public PartManager {
return false;
}

bool spaceExist(GraphSpaceID spaceId) override {
return partsMap_.find(spaceId) != partsMap_.end();
}

void clear() {
partsMap_.clear();
handler_ = nullptr;
}

PartsMap& partsMap() {
return partsMap_;
}
Expand Down
Loading

0 comments on commit 7bdd86e

Please sign in to comment.