Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
use local id in the space range for tag edge index
Browse files Browse the repository at this point in the history
  • Loading branch information
panda-sheep committed Aug 23, 2021
1 parent 307528f commit a9b707d
Show file tree
Hide file tree
Showing 13 changed files with 520 additions and 20 deletions.
17 changes: 16 additions & 1 deletion src/meta/MetaServiceUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ static const std::unordered_map<
{"statis", {"__statis__", MetaServiceUtils::parseStatisSpace}},
{"balance_task", {"__balance_task__", nullptr}},
{"balance_plan", {"__balance_plan__", nullptr}},
{"ft_index", {"__ft_index__", nullptr}}};
{"ft_index", {"__ft_index__", nullptr}},
{"local_id", {"__local_id__", MetaServiceUtils::parseLocalIdSpace}}};

static const std::string kSpacesTable = tableMaps.at("spaces").first; // NOLINT
static const std::string kPartsTable = tableMaps.at("parts").first; // NOLINT
Expand All @@ -77,6 +78,7 @@ static const std::string kListenerTable = tableMaps.at("listener").first;
static const std::string kStatisTable = tableMaps.at("statis").first; // NOLINT
static const std::string kBalanceTaskTable = tableMaps.at("balance_task").first; // NOLINT
static const std::string kBalancePlanTable = tableMaps.at("balance_plan").first; // NOLINT
static const std::string kLocalIdTable = tableMaps.at("local_id").first; // NOLINT

const std::string kFTIndexTable = tableMaps.at("ft_index").first; // NOLINT
const std::string kFTServiceTable = systemTableMaps.at("ft_service").first; // NOLINT
Expand Down Expand Up @@ -1447,5 +1449,18 @@ std::string MetaServiceUtils::fulltextIndexPrefix() {
return kFTIndexTable;
}

std::string MetaServiceUtils::localIdKey(GraphSpaceID spaceId) {
std::string key;
key.reserve(kLocalIdTable.size() + sizeof(GraphSpaceID));
key.append(kLocalIdTable.data(), kLocalIdTable.size())
.append(reinterpret_cast<const char*>(&spaceId), sizeof(GraphSpaceID));
return key;
}

GraphSpaceID MetaServiceUtils::parseLocalIdSpace(folly::StringPiece rawData) {
auto offset = kLocalIdTable.size();
return *reinterpret_cast<const GraphSpaceID*>(rawData.data() + offset);
}

} // namespace meta
} // namespace nebula
4 changes: 4 additions & 0 deletions src/meta/MetaServiceUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,10 @@ class MetaServiceUtils final {
const std::unordered_set<GraphSpaceID>& spaces,
const std::string& backupName,
const std::vector<std::string>* spaceName);

static std::string localIdKey(GraphSpaceID spaceId);

static GraphSpaceID parseLocalIdSpace(folly::StringPiece rawData);
};

} // namespace meta
Expand Down
10 changes: 10 additions & 0 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ class BaseProcessor {
* */
ErrorOr<nebula::cpp2::ErrorCode, int32_t> autoIncrementId();

/**
* Get the current available global id
**/
ErrorOr<nebula::cpp2::ErrorCode, int32_t> getAvailableGolbalId();

/**
* Get one auto-increment Id in spaceId.
* */
ErrorOr<nebula::cpp2::ErrorCode, int32_t> autoIncrementIdInSpace(GraphSpaceID spaceId);

/**
* Check spaceId exist or not.
* */
Expand Down
68 changes: 68 additions & 0 deletions src/meta/processors/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,74 @@ ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::autoIncrementId()
}


template<typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::getAvailableGolbalId() {
// A read lock has been added before call
static const std::string kIdKey = "__id__";
int32_t id;
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, kIdKey, &val);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
if (ret != nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
return ret;
}
id = 1;
} else {
id = *reinterpret_cast<const int32_t*>(val.c_str()) + 1;
}

return id;
}


template<typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, int32_t>
BaseProcessor<RESP>::autoIncrementIdInSpace(GraphSpaceID spaceId) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::localIdLock());
folly::SharedMutex::ReadHolder rHolder(LockUtils::idLock());
auto globalIdRet = getAvailableGolbalId();
if (!nebula::ok(globalIdRet)) {
return nebula::error(globalIdRet);
}
auto globalId = nebula::value(globalIdRet);

auto localIdkey = MetaServiceUtils::localIdKey(spaceId);
int32_t id;
std::string val;
auto ret = kvstore_->get(kDefaultSpaceId, kDefaultPartId, localIdkey, &val);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
if (ret != nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND) {
return ret;
}

// In order to be compatible with the existing old schema, and simple to implement,
// when the local_id record does not exist in space, directly use the smallest
// id available globally.
id = globalId;
} else {
id = *reinterpret_cast<const int32_t*>(val.c_str()) + 1;
}

std::vector<kvstore::KV> data;
data.emplace_back(localIdkey,
std::string(reinterpret_cast<const char*>(&id), sizeof(id)));
folly::Baton<true, std::atomic> baton;
kvstore_->asyncMultiPut(kDefaultSpaceId,
kDefaultPartId,
std::move(data),
[&] (nebula::cpp2::ErrorCode code) {
ret = code;
baton.post();
});
baton.wait();
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
} else {
return id;
}
}


template<typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::spaceExist(GraphSpaceID spaceId) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class LockUtils {
GENERATE_LOCK(lastUpdateTime);
GENERATE_LOCK(space);
GENERATE_LOCK(id);
GENERATE_LOCK(localId);
GENERATE_LOCK(tag);
GENERATE_LOCK(edge);
GENERATE_LOCK(tagIndex);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/indexMan/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
}

std::vector<kvstore::KV> data;
auto edgeIndexRet = autoIncrementId();
auto edgeIndexRet = autoIncrementIdInSpace(space);
if (!nebula::ok(edgeIndexRet)) {
LOG(ERROR) << "Create edge index failed: Get edge index ID failed";
handleErrorCode(nebula::error(edgeIndexRet));
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/indexMan/CreateTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) {
}

std::vector<kvstore::KV> data;
auto tagIndexRet = autoIncrementId();
auto tagIndexRet = autoIncrementIdInSpace(space);
if (!nebula::ok(tagIndexRet)) {
LOG(ERROR) << "Create tag index failed : Get tag index ID failed";
handleErrorCode(nebula::error(tagIndexRet));
Expand Down
4 changes: 4 additions & 0 deletions src/meta/processors/partsMan/DropSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) {
ftIter->next();
}

// 7. Delete local_id meta data
auto localIdkey = MetaServiceUtils::localIdKey(spaceId);
deleteKeys.emplace_back(localIdkey);

doSyncMultiRemoveAndUpdate(std::move(deleteKeys));
LOG(INFO) << "Drop space " << spaceName << ", id " << spaceId;
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schemaMan/CreateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) {
}
}

auto edgeTypeRet = autoIncrementId();
auto edgeTypeRet = autoIncrementIdInSpace(spaceId);;
if (!nebula::ok(edgeTypeRet)) {
LOG(ERROR) << "Create edge failed : Get edge type id failed";
handleErrorCode(nebula::error(edgeTypeRet));
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schemaMan/CreateTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void CreateTagProcessor::process(const cpp2::CreateTagReq& req) {
}
}

auto tagRet = autoIncrementId();
auto tagRet = autoIncrementIdInSpace(spaceId);
if (!nebula::ok(tagRet)) {
LOG(ERROR) << "Create tag failed : Get tag id failed.";
handleErrorCode(nebula::error(tagRet));
Expand Down
Loading

0 comments on commit a9b707d

Please sign in to comment.