Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dss part1: use local id in the space range for tag edge index #2550

Merged
merged 2 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ static const std::unordered_map<
{"stats", {"__stats__", MetaServiceUtils::parseStatsSpace}},
{"balance_task", {"__balance_task__", nullptr}},
{"balance_plan", {"__balance_plan__", nullptr}},
{"ft_index", {"__ft_index__", nullptr}}};
{"ft_index", {"__ft_index__", nullptr}},
{"local_id", {"__local_id__", MetaServiceUtils::parseLocalIdSpace}}};

// clang-format off
static const std::string kSpacesTable = tableMaps.at("spaces").first; // NOLINT
Expand Down Expand Up @@ -80,6 +81,7 @@ static const std::string kListenerTable = tableMaps.at("listener").first;
static const std::string kStatsTable = tableMaps.at("stats").first; // NOLINT
static const std::string kBalanceTaskTable = tableMaps.at("balance_task").first; // NOLINT
static const std::string kBalancePlanTable = tableMaps.at("balance_plan").first; // NOLINT
static const std::string kLocalIdTable = tableMaps.at("local_id").first; // NOLINT

const std::string kFTIndexTable = tableMaps.at("ft_index").first; // NOLINT
const std::string kFTServiceTable = systemTableMaps.at("ft_service").first; // NOLINT
Expand Down Expand Up @@ -1403,5 +1405,18 @@ cpp2::FTIndex MetaServiceUtils::parsefulltextIndex(folly::StringPiece val) {

std::string MetaServiceUtils::fulltextIndexPrefix() { return kFTIndexTable; }

std::string MetaServiceUtils::localIdKey(GraphSpaceID spaceId) {
std::string key;
key.reserve(kLocalIdTable.size() + sizeof(GraphSpaceID));
key.append(kLocalIdTable.data(), kLocalIdTable.size())
.append(reinterpret_cast<const char*>(&spaceId), sizeof(GraphSpaceID));
return key;
}

GraphSpaceID MetaServiceUtils::parseLocalIdSpace(folly::StringPiece rawData) {
auto offset = kLocalIdTable.size();
return *reinterpret_cast<const GraphSpaceID*>(rawData.data() + offset);
}

} // namespace meta
} // namespace nebula
4 changes: 4 additions & 0 deletions src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ class MetaServiceUtils final {
const std::unordered_set<GraphSpaceID>& spaces,
const std::string& backupName,
const std::vector<std::string>* spaceName);

static std::string localIdKey(GraphSpaceID spaceId);

static GraphSpaceID parseLocalIdSpace(folly::StringPiece rawData);
};

} // namespace meta
Expand Down
63 changes: 63 additions & 0 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,69 @@ ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::autoIncrementId()
}
}

template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::getAvailableGolbalId() {
// A read lock has been added before call
static const std::string kIdKey = "__id__";
int32_t id;
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, kIdKey, &val);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
if (ret != nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
return ret;
}
id = 1;
} else {
id = *reinterpret_cast<const int32_t*>(val.c_str()) + 1;
}

return id;
}

template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::autoIncrementIdInSpace(
GraphSpaceID spaceId) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::localIdLock());
folly::SharedMutex::ReadHolder rHolder(LockUtils::idLock());
auto globalIdRet = getAvailableGolbalId();
if (!nebula::ok(globalIdRet)) {
return nebula::error(globalIdRet);
}
auto globalId = nebula::value(globalIdRet);

auto localIdkey = MetaServiceUtils::localIdKey(spaceId);
int32_t id;
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, localIdkey, &val);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
if (ret != nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
return ret;
}

// In order to be compatible with the existing old schema, and simple to implement,
// when the local_id record does not exist in space, directly use the smallest
// id available globally.
id = globalId;
panda-sheep marked this conversation as resolved.
Show resolved Hide resolved
} else {
id = *reinterpret_cast<const int32_t*>(val.c_str()) + 1;
}

std::vector<kvstore::KV> data;
data.emplace_back(localIdkey, std::string(reinterpret_cast<const char*>(&id), sizeof(id)));
folly::Baton<true, std::atomic> baton;
kvstore_->asyncMultiPut(
kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) {
ret = code;
baton.post();
});
baton.wait();
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
} else {
return id;
}
}

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::spaceExist(GraphSpaceID spaceId) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
Expand Down
10 changes: 10 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ class BaseProcessor {
* */
ErrorOr<nebula::cpp2::ErrorCode, int32_t> autoIncrementId();

/**
* Get the current available global id
**/
ErrorOr<nebula::cpp2::ErrorCode, int32_t> getAvailableGolbalId();

/**
* Get one auto-increment Id in spaceId.
* */
ErrorOr<nebula::cpp2::ErrorCode, int32_t> autoIncrementIdInSpace(GraphSpaceID spaceId);

/**
* Check spaceId exist or not.
* */
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class LockUtils {
GENERATE_LOCK(lastUpdateTime);
GENERATE_LOCK(space);
GENERATE_LOCK(id);
GENERATE_LOCK(localId);
GENERATE_LOCK(tag);
GENERATE_LOCK(edge);
GENERATE_LOCK(tagIndex);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/index/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
}

std::vector<kvstore::KV> data;
auto edgeIndexRet = autoIncrementId();
auto edgeIndexRet = autoIncrementIdInSpace(space);
if (!nebula::ok(edgeIndexRet)) {
LOG(ERROR) << "Create edge index failed: Get edge index ID failed";
handleErrorCode(nebula::error(edgeIndexRet));
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/index/CreateTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) {
}

std::vector<kvstore::KV> data;
auto tagIndexRet = autoIncrementId();
auto tagIndexRet = autoIncrementIdInSpace(space);
if (!nebula::ok(tagIndexRet)) {
LOG(ERROR) << "Create tag index failed : Get tag index ID failed";
handleErrorCode(nebula::error(tagIndexRet));
Expand Down
4 changes: 4 additions & 0 deletions src/meta/processors/parts/DropSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) {
ftIter->next();
}

// 7. Delete local_id meta data
auto localIdkey = MetaServiceUtils::localIdKey(spaceId);
deleteKeys.emplace_back(localIdkey);

doSyncMultiRemoveAndUpdate(std::move(deleteKeys));
LOG(INFO) << "Drop space " << spaceName << ", id " << spaceId;
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/CreateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) {
}
}

auto edgeTypeRet = autoIncrementId();
auto edgeTypeRet = autoIncrementIdInSpace(spaceId);
if (!nebula::ok(edgeTypeRet)) {
LOG(ERROR) << "Create edge failed : Get edge type id failed";
handleErrorCode(nebula::error(edgeTypeRet));
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/CreateTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void CreateTagProcessor::process(const cpp2::CreateTagReq& req) {
}
}

auto tagRet = autoIncrementId();
auto tagRet = autoIncrementIdInSpace(spaceId);
if (!nebula::ok(tagRet)) {
LOG(ERROR) << "Create tag failed : Get tag id failed.";
handleErrorCode(nebula::error(tagRet));
Expand Down
Loading