From 9b7a58a9fda6808afa21abb0af298aaf80970607 Mon Sep 17 00:00:00 2001 From: "hs.zhang" <22708345+cangfengzhs@users.noreply.github.com> Date: Thu, 23 Dec 2021 18:29:33 +0800 Subject: [PATCH] use rcu replace thread local --- src/clients/meta/MetaClient.cpp | 364 ++++++++++++++++++-------------- src/clients/meta/MetaClient.h | 6 +- 2 files changed, 206 insertions(+), 164 deletions(-) diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index aab1d30cf9b..dfe4fa69304 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -41,6 +41,8 @@ DEFINE_int32(check_plan_killed_frequency, 8, "check plan killed every 1< indexItemVec); + MetaClient::MetaClient(std::shared_ptr ioThreadPool, std::vector addrs, const MetaClientOptions& options) @@ -48,7 +50,8 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool addrs_(std::move(addrs)), options_(options), sessionMap_(new SessionMap{}), - killedPlans_(new folly::F14FastSet>{}) { + killedPlans_(new folly::F14FastSet>{}), + metadata_(new MetaData()) { CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required"; CHECK(!addrs_.empty()) << "No meta server address is specified or can be solved. Meta server is required"; @@ -66,6 +69,7 @@ MetaClient::~MetaClient() { stop(); delete sessionMap_.load(); delete killedPlans_.load(); + delete metadata_.load(); VLOG(3) << "~MetaClient"; } @@ -305,6 +309,37 @@ bool MetaClient::loadData() { diff(oldCache, localCache_); listenerDiff(oldCache, localCache_); loadRemoteListeners(); + + auto newMetaData = new MetaData(); + + folly::RWSpinLock::ReadHolder holder(localCacheLock_); + for (auto& spaceInfo : localCache_) { + GraphSpaceID spaceId = spaceInfo.first; + std::shared_ptr info = spaceInfo.second; + std::shared_ptr infoDeepCopy = std::make_shared(*info); + infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_); + infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_); + newMetaData->localCache_[spaceId] = infoDeepCopy; + } + newMetaData->spaceIndexByName_ = spaceIndexByName_; + newMetaData->spaceTagIndexByName_ = spaceTagIndexByName_; + newMetaData->spaceEdgeIndexByName_ = spaceEdgeIndexByName_; + newMetaData->spaceEdgeIndexByType_ = spaceEdgeIndexByType_; + newMetaData->spaceNewestTagVerMap_ = spaceNewestTagVerMap_; + newMetaData->spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_; + newMetaData->spaceTagIndexById_ = spaceTagIndexById_; + newMetaData->spaceAllEdgeMap_ = spaceAllEdgeMap_; + + newMetaData->userRolesMap_ = userRolesMap_; + newMetaData->storageHosts_ = storageHosts_; + newMetaData->fulltextIndexMap_ = fulltextIndexMap_; + newMetaData->userPasswordMap_ = userPasswordMap_; + + auto oldMetaData = metadata_.load(); + metadata_.store(newMetaData); + folly::rcu_retire(oldMetaData); ready_ = true; return true; } @@ -455,7 +490,7 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId, return true; } -static Indexes buildIndexes(std::vector indexItemVec) { +Indexes buildIndexes(std::vector indexItemVec) { Indexes indexes; for (auto index : indexItemVec) { auto indexName = index.get_index_name(); @@ -544,46 +579,11 @@ bool MetaClient::loadFulltextIndexes() { return true; } -const MetaClient::ThreadLocalInfo& MetaClient::getThreadLocalInfo() { - ThreadLocalInfo& threadLocalInfo = folly::SingletonThreadLocal::get(); - - if (threadLocalInfo.localLastUpdateTime_ < localDataLastUpdateTime_) { - threadLocalInfo.localLastUpdateTime_ = localDataLastUpdateTime_; - - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - for (auto& spaceInfo : localCache_) { - GraphSpaceID spaceId = spaceInfo.first; - std::shared_ptr info = spaceInfo.second; - std::shared_ptr infoDeepCopy = std::make_shared(*info); - infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_); - infoDeepCopy->edgeSchemas_ = - buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_); - infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_); - infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_); - threadLocalInfo.localCache_[spaceId] = infoDeepCopy; - } - threadLocalInfo.spaceIndexByName_ = spaceIndexByName_; - threadLocalInfo.spaceTagIndexByName_ = spaceTagIndexByName_; - threadLocalInfo.spaceEdgeIndexByName_ = spaceEdgeIndexByName_; - threadLocalInfo.spaceEdgeIndexByType_ = spaceEdgeIndexByType_; - threadLocalInfo.spaceNewestTagVerMap_ = spaceNewestTagVerMap_; - threadLocalInfo.spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_; - threadLocalInfo.spaceTagIndexById_ = spaceTagIndexById_; - threadLocalInfo.spaceAllEdgeMap_ = spaceAllEdgeMap_; - - threadLocalInfo.userRolesMap_ = userRolesMap_; - threadLocalInfo.storageHosts_ = storageHosts_; - threadLocalInfo.fulltextIndexMap_ = fulltextIndexMap_; - threadLocalInfo.userPasswordMap_ = userPasswordMap_; - } - - return threadLocalInfo; -} - Status MetaClient::checkTagIndexed(GraphSpaceID spaceId, IndexID indexID) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto indexIt = it->second->tagIndexes_.find(indexID); if (indexIt != it->second->tagIndexes_.end()) { return Status::OK(); @@ -595,9 +595,10 @@ Status MetaClient::checkTagIndexed(GraphSpaceID spaceId, IndexID indexID) { } Status MetaClient::checkEdgeIndexed(GraphSpaceID space, IndexID indexID) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(space); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(space); + if (it != metadata.localCache_.end()) { auto indexIt = it->second->edgeIndexes_.find(indexID); if (indexIt != it->second->edgeIndexes_.end()) { return Status::OK(); @@ -1257,9 +1258,10 @@ StatusOr MetaClient::getSpaceIdByNameFromCache(const std::string& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceIndexByName_.find(name); - if (it != threadLocalInfo.spaceIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceIndexByName_.find(name); + if (it != metadata.spaceIndexByName_.end()) { return it->second; } return Status::SpaceNotFound(); @@ -1269,9 +1271,10 @@ StatusOr MetaClient::getSpaceNameByIdFromCache(GraphSpaceID spaceId if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1283,9 +1286,10 @@ StatusOr MetaClient::getTagIDByNameFromCache(const GraphSpaceID& space, if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceTagIndexByName_.find(std::make_pair(space, name)); - if (it == threadLocalInfo.spaceTagIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceTagIndexByName_.find(std::make_pair(space, name)); + if (it == metadata.spaceTagIndexByName_.end()) { return Status::Error("TagName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1296,9 +1300,10 @@ StatusOr MetaClient::getTagNameByIdFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceTagIndexById_.find(std::make_pair(space, tagId)); - if (it == threadLocalInfo.spaceTagIndexById_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceTagIndexById_.find(std::make_pair(space, tagId)); + if (it == metadata.spaceTagIndexById_.end()) { return Status::Error("TagID `%d' is nonexistent", tagId); } return it->second; @@ -1309,9 +1314,10 @@ StatusOr MetaClient::getEdgeTypeByNameFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceEdgeIndexByName_.find(std::make_pair(space, name)); - if (it == threadLocalInfo.spaceEdgeIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceEdgeIndexByName_.find(std::make_pair(space, name)); + if (it == metadata.spaceEdgeIndexByName_.end()) { return Status::Error("EdgeName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1322,9 +1328,10 @@ StatusOr MetaClient::getEdgeNameByTypeFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); - if (it == threadLocalInfo.spaceEdgeIndexByType_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); + if (it == metadata.spaceEdgeIndexByType_.end()) { return Status::Error("EdgeType `%d' is nonexistent", edgeType); } return it->second; @@ -1334,9 +1341,10 @@ StatusOr> MetaClient::getAllEdgeFromCache(const GraphSp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceAllEdgeMap_.find(space); - if (it == threadLocalInfo.spaceAllEdgeMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceAllEdgeMap_.find(space); + if (it == metadata.spaceAllEdgeMap_.end()) { return Status::Error("SpaceId `%d' is nonexistent", space); } return it->second; @@ -1469,14 +1477,16 @@ folly::Future> MetaClient::removeRange(std::string segment, } PartsMap MetaClient::getPartsMapFromCache(const HostAddr& host) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return doGetPartsMap(host, threadLocalInfo.localCache_); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return doGetPartsMap(host, metadata.localCache_); } StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it == metadata.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } auto& cache = it->second; @@ -1494,9 +1504,10 @@ StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, Part Status MetaClient::checkPartExistInCache(const HostAddr& host, GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end()) { for (auto& pId : partsIt->second) { @@ -1513,9 +1524,10 @@ Status MetaClient::checkPartExistInCache(const HostAddr& host, } Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spaceId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end() && !partsIt->second.empty()) { return Status::OK(); @@ -1527,9 +1539,10 @@ Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spa } StatusOr MetaClient::partsNum(GraphSpaceID spaceId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it == metadata.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } return it->second->partsAlloc_.size(); @@ -1920,9 +1933,10 @@ StatusOr MetaClient::getSpaceVidLen(const GraphSpaceID& spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1938,9 +1952,10 @@ StatusOr MetaClient::getSpaceVidType(const GraphSpac if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1959,9 +1974,10 @@ StatusOr MetaClient::getSpaceDesc(const GraphSpaceID& space) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(space); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(space); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << space << " not found!"; return Status::Error("Space %d not found", space); } @@ -1982,9 +1998,10 @@ StatusOr> MetaClient::getTagSchemaFr if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt != metadata.localCache_.end()) { auto tagIt = spaceIt->second->tagSchemas_.find(tagID); if (tagIt != spaceIt->second->tagSchemas_.end() && !tagIt->second.empty()) { size_t vNum = tagIt->second.size(); @@ -2002,9 +2019,10 @@ StatusOr> MetaClient::getEdgeSchemaF if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt != metadata.localCache_.end()) { auto edgeIt = spaceIt->second->edgeSchemas_.find(edgeType); if (edgeIt != spaceIt->second->edgeSchemas_.end() && !edgeIt->second.empty()) { size_t vNum = edgeIt->second.size(); @@ -2021,9 +2039,10 @@ StatusOr MetaClient::getAllVerTagSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->tagSchemas_; @@ -2033,9 +2052,10 @@ StatusOr MetaClient::getAllLatestVerTagSchema(const GraphSpaceID& spa if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } TagSchema tagsSchema; @@ -2051,9 +2071,10 @@ StatusOr MetaClient::getAllVerEdgeSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->edgeSchemas_; @@ -2063,9 +2084,10 @@ StatusOr MetaClient::getAllLatestVerEdgeSchemaFromCache(const GraphS if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } EdgeSchema edgesSchema; @@ -2153,9 +2175,10 @@ StatusOr> MetaClient::getTagIndexFromCache(Grap return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2190,9 +2213,10 @@ StatusOr> MetaClient::getEdgeIndexFromCache(Gra return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2227,9 +2251,10 @@ StatusOr>> MetaClient::getTagIndexe return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2250,9 +2275,10 @@ StatusOr>> MetaClient::getEdgeIndex return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2326,9 +2352,10 @@ std::vector MetaClient::getRolesByUserFromCache(const std::strin if (!ready_) { return std::vector(0); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userRolesMap_.find(user); - if (iter == threadLocalInfo.userRolesMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userRolesMap_.find(user); + if (iter == metadata.userRolesMap_.end()) { return std::vector(0); } return iter->second; @@ -2338,9 +2365,10 @@ bool MetaClient::authCheckFromCache(const std::string& account, const std::strin if (!ready_) { return false; } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userPasswordMap_.find(account); - if (iter == threadLocalInfo.userPasswordMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userPasswordMap_.find(account); + if (iter == metadata.userPasswordMap_.end()) { return false; } return iter->second == password; @@ -2350,18 +2378,20 @@ bool MetaClient::checkShadowAccountFromCache(const std::string& account) { if (!ready_) { return false; } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userPasswordMap_.find(account); - if (iter != threadLocalInfo.userPasswordMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userPasswordMap_.find(account); + if (iter != metadata.userPasswordMap_.end()) { return true; } return false; } StatusOr MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceInfo = threadLocalInfo.localCache_.find(spaceId); - if (spaceInfo == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceInfo = metadata.localCache_.find(spaceId); + if (spaceInfo == metadata.localCache_.end()) { return Status::Error("Term not found!"); } @@ -2378,8 +2408,9 @@ StatusOr> MetaClient::getStorageHosts() { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return threadLocalInfo.storageHosts_; + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return metadata.storageHosts_; } StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& space, @@ -2387,9 +2418,10 @@ StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); - if (it == threadLocalInfo.spaceNewestTagVerMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); + if (it == metadata.spaceNewestTagVerMap_.end()) { return Status::TagNotFound(); } return it->second; @@ -2400,9 +2432,10 @@ StatusOr MetaClient::getLatestEdgeVersionFromCache(const GraphSpaceID if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); - if (it == threadLocalInfo.spaceNewestEdgeVerMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); + if (it == metadata.spaceNewestEdgeVerMap_.end()) { return Status::EdgeNotFound(); } return it->second; @@ -2856,9 +2889,10 @@ MetaClient::getListenersBySpaceHostFromCache(GraphSpaceID spaceId, const HostAdd if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -2875,8 +2909,9 @@ StatusOr MetaClient::getListenersByHostFromCache(const HostAddr& h if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return doGetListenersMap(host, threadLocalInfo.localCache_); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return doGetListenersMap(host, metadata.localCache_); } ListenersMap MetaClient::doGetListenersMap(const HostAddr& host, const LocalCache& localCache) { @@ -2912,9 +2947,10 @@ StatusOr MetaClient::getListenerHostsBySpacePartType(GraphSpaceID spac if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -2933,9 +2969,10 @@ StatusOr> MetaClient::getListenerHostTypeBySpace if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3024,8 +3061,9 @@ void MetaClient::updateNestedGflags(const std::unordered_map optionMap.emplace(value.first, value.second.toString()); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - for (const auto& spaceEntry : threadLocalInfo.localCache_) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + for (const auto& spaceEntry : metadata.localCache_) { listener_->onSpaceOptionUpdated(spaceEntry.first, optionMap); } } @@ -3366,8 +3404,9 @@ StatusOr> MetaClient::getFTIndexe if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return threadLocalInfo.fulltextIndexMap_; + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return metadata.fulltextIndexMap_; } StatusOr> MetaClient::getFTIndexBySpaceFromCache( @@ -3375,9 +3414,10 @@ StatusOr> MetaClient::getFTIndexB if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); std::unordered_map indexes; - for (const auto& it : threadLocalInfo.fulltextIndexMap_) { + for (const auto& it : metadata.fulltextIndexMap_) { if (it.second.get_space_id() == spaceId) { indexes[it.first] = it.second; } @@ -3390,8 +3430,9 @@ StatusOr> MetaClient::getFTIndexBySpaceSch if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - for (auto& it : threadLocalInfo.fulltextIndexMap_) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + for (auto& it : metadata.fulltextIndexMap_) { auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type ? it.second.get_depend_schema().get_edge_type() : it.second.get_depend_schema().get_tag_id(); @@ -3407,12 +3448,13 @@ StatusOr MetaClient::getFTIndexByNameFromCache(GraphSpaceID space if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - if (threadLocalInfo.fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && - threadLocalInfo.fulltextIndexMap_.at(name).get_space_id() != spaceId) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + if (metadata.fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && + metadata.fulltextIndexMap_.at(name).get_space_id() != spaceId) { return Status::IndexNotFound(); } - return threadLocalInfo.fulltextIndexMap_.at(name); + return metadata.fulltextIndexMap_.at(name); } folly::Future> MetaClient::createSession( diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index f81701c8feb..c6c96819ead 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -771,7 +771,8 @@ class MetaClient { // Only report dir info once when started bool dirInfoReported_ = false; - struct ThreadLocalInfo { + + struct MetaData { int64_t localLastUpdateTime_{-2}; LocalCache localCache_; SpaceNameIdMap spaceIndexByName_; @@ -789,8 +790,6 @@ class MetaClient { UserPasswordMap userPasswordMap_; }; - const ThreadLocalInfo& getThreadLocalInfo(); - void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col, ObjectPool* pool); TagSchemas buildTagSchemas(std::vector tagItemVec, ObjectPool* pool); @@ -834,6 +833,7 @@ class MetaClient { int64_t heartbeatTime_; std::atomic sessionMap_; std::atomic>*> killedPlans_; + std::atomic metadata_; }; } // namespace meta