Skip to content

Commit

Permalink
support local id (#2550)
Browse files Browse the repository at this point in the history
Co-authored-by: CBS <56461666+bright-starry-sky@users.noreply.github.com>
  • Loading branch information
panda-sheep and bright-starry-sky authored Aug 25, 2021
1 parent f500d47 commit 1525aa3
Show file tree
Hide file tree
Showing 13 changed files with 511 additions and 20 deletions.
17 changes: 16 additions & 1 deletion src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ static const std::unordered_map<
{"stats", {"__stats__", MetaServiceUtils::parseStatsSpace}},
{"balance_task", {"__balance_task__", nullptr}},
{"balance_plan", {"__balance_plan__", nullptr}},
{"ft_index", {"__ft_index__", nullptr}}};
{"ft_index", {"__ft_index__", nullptr}},
{"local_id", {"__local_id__", MetaServiceUtils::parseLocalIdSpace}}};

// clang-format off
static const std::string kSpacesTable = tableMaps.at("spaces").first; // NOLINT
Expand Down Expand Up @@ -80,6 +81,7 @@ static const std::string kListenerTable = tableMaps.at("listener").first;
static const std::string kStatsTable = tableMaps.at("stats").first; // NOLINT
static const std::string kBalanceTaskTable = tableMaps.at("balance_task").first; // NOLINT
static const std::string kBalancePlanTable = tableMaps.at("balance_plan").first; // NOLINT
static const std::string kLocalIdTable = tableMaps.at("local_id").first; // NOLINT

const std::string kFTIndexTable = tableMaps.at("ft_index").first; // NOLINT
const std::string kFTServiceTable = systemTableMaps.at("ft_service").first; // NOLINT
Expand Down Expand Up @@ -1403,5 +1405,18 @@ cpp2::FTIndex MetaServiceUtils::parsefulltextIndex(folly::StringPiece val) {

std::string MetaServiceUtils::fulltextIndexPrefix() { return kFTIndexTable; }

std::string MetaServiceUtils::localIdKey(GraphSpaceID spaceId) {
std::string key;
key.reserve(kLocalIdTable.size() + sizeof(GraphSpaceID));
key.append(kLocalIdTable.data(), kLocalIdTable.size())
.append(reinterpret_cast<const char*>(&spaceId), sizeof(GraphSpaceID));
return key;
}

GraphSpaceID MetaServiceUtils::parseLocalIdSpace(folly::StringPiece rawData) {
auto offset = kLocalIdTable.size();
return *reinterpret_cast<const GraphSpaceID*>(rawData.data() + offset);
}

} // namespace meta
} // namespace nebula
4 changes: 4 additions & 0 deletions src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ class MetaServiceUtils final {
const std::unordered_set<GraphSpaceID>& spaces,
const std::string& backupName,
const std::vector<std::string>* spaceName);

static std::string localIdKey(GraphSpaceID spaceId);

static GraphSpaceID parseLocalIdSpace(folly::StringPiece rawData);
};

} // namespace meta
Expand Down
63 changes: 63 additions & 0 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,69 @@ ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::autoIncrementId()
}
}

template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::getAvailableGolbalId() {
// A read lock has been added before call
static const std::string kIdKey = "__id__";
int32_t id;
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, kIdKey, &val);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
if (ret != nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
return ret;
}
id = 1;
} else {
id = *reinterpret_cast<const int32_t*>(val.c_str()) + 1;
}

return id;
}

template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::autoIncrementIdInSpace(
GraphSpaceID spaceId) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::localIdLock());
folly::SharedMutex::ReadHolder rHolder(LockUtils::idLock());
auto globalIdRet = getAvailableGolbalId();
if (!nebula::ok(globalIdRet)) {
return nebula::error(globalIdRet);
}
auto globalId = nebula::value(globalIdRet);

auto localIdkey = MetaServiceUtils::localIdKey(spaceId);
int32_t id;
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, localIdkey, &val);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
if (ret != nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
return ret;
}

// In order to be compatible with the existing old schema, and simple to implement,
// when the local_id record does not exist in space, directly use the smallest
// id available globally.
id = globalId;
} else {
id = *reinterpret_cast<const int32_t*>(val.c_str()) + 1;
}

std::vector<kvstore::KV> data;
data.emplace_back(localIdkey, std::string(reinterpret_cast<const char*>(&id), sizeof(id)));
folly::Baton<true, std::atomic> baton;
kvstore_->asyncMultiPut(
kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) {
ret = code;
baton.post();
});
baton.wait();
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
} else {
return id;
}
}

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::spaceExist(GraphSpaceID spaceId) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
Expand Down
10 changes: 10 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ class BaseProcessor {
* */
ErrorOr<nebula::cpp2::ErrorCode, int32_t> autoIncrementId();

/**
* Get the current available global id
**/
ErrorOr<nebula::cpp2::ErrorCode, int32_t> getAvailableGolbalId();

/**
* Get one auto-increment Id in spaceId.
* */
ErrorOr<nebula::cpp2::ErrorCode, int32_t> autoIncrementIdInSpace(GraphSpaceID spaceId);

/**
* Check spaceId exist or not.
* */
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class LockUtils {
GENERATE_LOCK(lastUpdateTime);
GENERATE_LOCK(space);
GENERATE_LOCK(id);
GENERATE_LOCK(localId);
GENERATE_LOCK(tag);
GENERATE_LOCK(edge);
GENERATE_LOCK(tagIndex);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/index/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
}

std::vector<kvstore::KV> data;
auto edgeIndexRet = autoIncrementId();
auto edgeIndexRet = autoIncrementIdInSpace(space);
if (!nebula::ok(edgeIndexRet)) {
LOG(ERROR) << "Create edge index failed: Get edge index ID failed";
handleErrorCode(nebula::error(edgeIndexRet));
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/index/CreateTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) {
}

std::vector<kvstore::KV> data;
auto tagIndexRet = autoIncrementId();
auto tagIndexRet = autoIncrementIdInSpace(space);
if (!nebula::ok(tagIndexRet)) {
LOG(ERROR) << "Create tag index failed : Get tag index ID failed";
handleErrorCode(nebula::error(tagIndexRet));
Expand Down
4 changes: 4 additions & 0 deletions src/meta/processors/parts/DropSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) {
ftIter->next();
}

// 7. Delete local_id meta data
auto localIdkey = MetaServiceUtils::localIdKey(spaceId);
deleteKeys.emplace_back(localIdkey);

doSyncMultiRemoveAndUpdate(std::move(deleteKeys));
LOG(INFO) << "Drop space " << spaceName << ", id " << spaceId;
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/CreateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) {
}
}

auto edgeTypeRet = autoIncrementId();
auto edgeTypeRet = autoIncrementIdInSpace(spaceId);
if (!nebula::ok(edgeTypeRet)) {
LOG(ERROR) << "Create edge failed : Get edge type id failed";
handleErrorCode(nebula::error(edgeTypeRet));
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/CreateTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void CreateTagProcessor::process(const cpp2::CreateTagReq& req) {
}
}

auto tagRet = autoIncrementId();
auto tagRet = autoIncrementIdInSpace(spaceId);
if (!nebula::ok(tagRet)) {
LOG(ERROR) << "Create tag failed : Get tag id failed.";
handleErrorCode(nebula::error(tagRet));
Expand Down
Loading

0 comments on commit 1525aa3

Please sign in to comment.