From 720f8e027c2340ed5ebe050d33cff4c99fb09420 Mon Sep 17 00:00:00 2001 From: nebula-bots <88429921+nebula-bots@users.noreply.github.com> Date: Wed, 12 Jan 2022 17:38:21 +0800 Subject: [PATCH] Issue3373 storage exit crash (#3553) (#488) * use rcu replace thread local fix storage exit crash format address some comment * fix bug * fix bug fix bug fix bug Co-authored-by: hs.zhang <22708345+cangfengzhs@users.noreply.github.com> --- src/clients/meta/MetaClient.cpp | 518 +++++++++++++++-------------- src/clients/meta/MetaClient.h | 15 +- src/storage/test/ChainTestUtils.h | 6 +- src/storage/test/KillQueryTest.cpp | 2 +- 4 files changed, 282 insertions(+), 259 deletions(-) diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index bb89b97a00d..5e127418fd5 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -64,14 +64,15 @@ DEFINE_validator(failed_login_attempts, &ValidateFailedLoginAttempts); namespace nebula { namespace meta { +Indexes buildIndexes(std::vector indexItemVec); + MetaClient::MetaClient(std::shared_ptr ioThreadPool, std::vector addrs, const MetaClientOptions& options) : ioThreadPool_(ioThreadPool), addrs_(std::move(addrs)), options_(options), - sessionMap_(new SessionMap{}), - killedPlans_(new folly::F14FastSet>{}) { + metadata_(new MetaData()) { CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required"; CHECK(!addrs_.empty()) << "No meta server address is specified or can be solved. Meta server is required"; @@ -88,8 +89,7 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool MetaClient::~MetaClient() { notifyStop(); stop(); - delete sessionMap_.load(); - delete killedPlans_.load(); + delete metadata_.load(); VLOG(3) << "~MetaClient"; } @@ -207,38 +207,35 @@ bool MetaClient::loadUsersAndRoles() { userPasswordMap[user.first] = user.second; userNameList.emplace(user.first); } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - userRolesMap_ = std::move(userRolesMap); - userPasswordMap_ = std::move(userPasswordMap); - userIpWhitelistMap_ = std::move(userIpWhitelistMap); - - // Remove expired users from cache - auto removeExpiredUser = [&](folly::ConcurrentHashMap& userMap, - const std::unordered_set& userList) { - for (auto iter = userMap.begin(); iter != userMap.end();) { - if (!userList.count(iter->first)) { - iter = userMap.erase(iter); - } else { - ++iter; - } - } - }; - removeExpiredUser(userPasswordAttemptsRemain_, userNameList); - removeExpiredUser(userLoginLockTime_, userNameList); - - // This method is called periodically by the heartbeat thread, but we don't want to reset the - // failed login attempts every time. - for (const auto& user : userNameList) { - // If the user is not in the map, insert value with the default value - // Do nothing if the account is already in the map - if (userPasswordAttemptsRemain_.find(user) == userPasswordAttemptsRemain_.end()) { - userPasswordAttemptsRemain_.insert(user, FLAGS_failed_login_attempts); - } - if (userLoginLockTime_.find(user) == userLoginLockTime_.end()) { - userLoginLockTime_.insert(user, 0); + userRolesMap_ = std::move(userRolesMap); + userPasswordMap_ = std::move(userPasswordMap); + userIpWhitelistMap_ = std::move(userIpWhitelistMap); + + // Remove expired users from cache + auto removeExpiredUser = [&](folly::ConcurrentHashMap& userMap, + const std::unordered_set& userList) { + for (auto iter = userMap.begin(); iter != userMap.end();) { + if (!userList.count(iter->first)) { + iter = userMap.erase(iter); + } else { + ++iter; } } + }; + removeExpiredUser(userPasswordAttemptsRemain_, userNameList); + removeExpiredUser(userLoginLockTime_, userNameList); + + // This method is called periodically by the heartbeat thread, but we don't want to reset the + // failed login attempts every time. + for (const auto& user : userNameList) { + // If the user is not in the map, insert value with the default value + // Do nothing if the account is already in the map + if (userPasswordAttemptsRemain_.find(user) == userPasswordAttemptsRemain_.end()) { + userPasswordAttemptsRemain_.insert(user, FLAGS_failed_login_attempts); + } + if (userLoginLockTime_.find(user) == userLoginLockTime_.end()) { + userLoginLockTime_.insert(user, 0); + } } return true; } @@ -376,7 +373,6 @@ bool MetaClient::loadData() { decltype(localCache_) oldCache; { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); oldCache = std::move(localCache_); localCache_ = std::move(cache); spaceIndexByName_ = std::move(spaceIndexByName); @@ -392,7 +388,39 @@ bool MetaClient::loadData() { } localDataLastUpdateTime_.store(metadLastUpdateTime_.load()); - + auto newMetaData = new MetaData(); + + for (auto& spaceInfo : localCache_) { + GraphSpaceID spaceId = spaceInfo.first; + std::shared_ptr info = spaceInfo.second; + std::shared_ptr infoDeepCopy = std::make_shared(*info); + infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_); + infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_); + newMetaData->localCache_[spaceId] = infoDeepCopy; + } + newMetaData->spaceIndexByName_ = spaceIndexByName_; + newMetaData->spaceTagIndexByName_ = spaceTagIndexByName_; + newMetaData->spaceEdgeIndexByName_ = spaceEdgeIndexByName_; + newMetaData->spaceEdgeIndexByType_ = spaceEdgeIndexByType_; + newMetaData->spaceNewestTagVerMap_ = spaceNewestTagVerMap_; + newMetaData->spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_; + newMetaData->spaceTagIndexById_ = spaceTagIndexById_; + newMetaData->spaceAllEdgeMap_ = spaceAllEdgeMap_; + + newMetaData->userRolesMap_ = userRolesMap_; + newMetaData->storageHosts_ = storageHosts_; + newMetaData->fulltextIndexMap_ = fulltextIndexMap_; + newMetaData->userPasswordMap_ = userPasswordMap_; + newMetaData->sessionMap_ = std::move(sessionMap_); + newMetaData->killedPlans_ = std::move(killedPlans_); + newMetaData->serviceClientList_ = std::move(serviceClientList_); + newMetaData->userIpWhitelistMap_ = std::move(userIpWhitelistMap_); + newMetaData->metaListeners_ = std::move(metaListeners_); + auto oldMetaData = metadata_.load(); + metadata_.store(newMetaData); + folly::rcu_retire(oldMetaData); diff(oldCache, localCache_); listenerDiff(oldCache, localCache_); loadRemoteListeners(); @@ -546,7 +574,7 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId, return true; } -static Indexes buildIndexes(std::vector indexItemVec) { +Indexes buildIndexes(std::vector indexItemVec) { Indexes indexes; for (auto index : indexItemVec) { auto indexName = index.get_index_name(); @@ -662,10 +690,7 @@ bool MetaClient::loadGlobalServiceClients() { LOG(ERROR) << "List services failed, status:" << ret.status(); return false; } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - serviceClientList_ = std::move(ret).value(); - } + serviceClientList_ = std::move(ret).value(); return true; } @@ -675,55 +700,15 @@ bool MetaClient::loadFulltextIndexes() { LOG(ERROR) << "List fulltext indexes failed, status:" << ftRet.status(); return false; } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - fulltextIndexMap_ = std::move(ftRet).value(); - } + fulltextIndexMap_ = std::move(ftRet).value(); return true; } -const MetaClient::ThreadLocalInfo& MetaClient::getThreadLocalInfo() { - ThreadLocalInfo& threadLocalInfo = folly::SingletonThreadLocal::get(); - - if (threadLocalInfo.localLastUpdateTime_ < localDataLastUpdateTime_) { - threadLocalInfo.localLastUpdateTime_ = localDataLastUpdateTime_; - - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - for (auto& spaceInfo : localCache_) { - GraphSpaceID spaceId = spaceInfo.first; - std::shared_ptr info = spaceInfo.second; - std::shared_ptr infoDeepCopy = std::make_shared(*info); - infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_); - infoDeepCopy->edgeSchemas_ = - buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_); - infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_); - infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_); - threadLocalInfo.localCache_[spaceId] = infoDeepCopy; - } - threadLocalInfo.spaceIndexByName_ = spaceIndexByName_; - threadLocalInfo.spaceTagIndexByName_ = spaceTagIndexByName_; - threadLocalInfo.spaceEdgeIndexByName_ = spaceEdgeIndexByName_; - threadLocalInfo.spaceEdgeIndexByType_ = spaceEdgeIndexByType_; - threadLocalInfo.spaceNewestTagVerMap_ = spaceNewestTagVerMap_; - threadLocalInfo.spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_; - threadLocalInfo.spaceTagIndexById_ = spaceTagIndexById_; - threadLocalInfo.spaceAllEdgeMap_ = spaceAllEdgeMap_; - threadLocalInfo.metaListeners_ = metaListeners_; - - threadLocalInfo.userRolesMap_ = userRolesMap_; - threadLocalInfo.storageHosts_ = storageHosts_; - threadLocalInfo.fulltextIndexMap_ = fulltextIndexMap_; - threadLocalInfo.userPasswordMap_ = userPasswordMap_; - threadLocalInfo.userIpWhitelistMap_ = userIpWhitelistMap_; - } - - return threadLocalInfo; -} - Status MetaClient::checkTagIndexed(GraphSpaceID spaceId, IndexID indexID) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto indexIt = it->second->tagIndexes_.find(indexID); if (indexIt != it->second->tagIndexes_.end()) { return Status::OK(); @@ -735,9 +720,10 @@ Status MetaClient::checkTagIndexed(GraphSpaceID spaceId, IndexID indexID) { } Status MetaClient::checkEdgeIndexed(GraphSpaceID space, IndexID indexID) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(space); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(space); + if (it != metadata.localCache_.end()) { auto indexIt = it->second->edgeIndexes_.find(indexID); if (indexIt != it->second->edgeIndexes_.end()) { return Status::OK(); @@ -1431,9 +1417,10 @@ StatusOr MetaClient::getSpaceIdByNameFromCache(const std::string& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceIndexByName_.find(name); - if (it != threadLocalInfo.spaceIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceIndexByName_.find(name); + if (it != metadata.spaceIndexByName_.end()) { return it->second; } return Status::SpaceNotFound(); @@ -1443,9 +1430,10 @@ StatusOr MetaClient::getSpaceNameByIdFromCache(GraphSpaceID spaceId if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1457,9 +1445,10 @@ StatusOr MetaClient::getTagIDByNameFromCache(const GraphSpaceID& space, if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceTagIndexByName_.find(std::make_pair(space, name)); - if (it == threadLocalInfo.spaceTagIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceTagIndexByName_.find(std::make_pair(space, name)); + if (it == metadata.spaceTagIndexByName_.end()) { return Status::Error("TagName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1470,9 +1459,10 @@ StatusOr MetaClient::getTagNameByIdFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceTagIndexById_.find(std::make_pair(space, tagId)); - if (it == threadLocalInfo.spaceTagIndexById_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceTagIndexById_.find(std::make_pair(space, tagId)); + if (it == metadata.spaceTagIndexById_.end()) { return Status::Error("TagID `%d' is nonexistent", tagId); } return it->second; @@ -1483,9 +1473,10 @@ StatusOr MetaClient::getEdgeTypeByNameFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceEdgeIndexByName_.find(std::make_pair(space, name)); - if (it == threadLocalInfo.spaceEdgeIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceEdgeIndexByName_.find(std::make_pair(space, name)); + if (it == metadata.spaceEdgeIndexByName_.end()) { return Status::Error("EdgeName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1496,9 +1487,10 @@ StatusOr MetaClient::getEdgeNameByTypeFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); - if (it == threadLocalInfo.spaceEdgeIndexByType_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); + if (it == metadata.spaceEdgeIndexByType_.end()) { return Status::Error("EdgeType `%d' is nonexistent", edgeType); } return it->second; @@ -1508,9 +1500,10 @@ StatusOr> MetaClient::getAllEdgeFromCache(const GraphSp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceAllEdgeMap_.find(space); - if (it == threadLocalInfo.spaceAllEdgeMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceAllEdgeMap_.find(space); + if (it == metadata.spaceAllEdgeMap_.end()) { return Status::Error("SpaceId `%d' is nonexistent", space); } return it->second; @@ -1645,14 +1638,16 @@ folly::Future> MetaClient::removeRange(std::string segment, } PartsMap MetaClient::getPartsMapFromCache(const HostAddr& host) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return doGetPartsMap(host, threadLocalInfo.localCache_); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return doGetPartsMap(host, metadata.localCache_); } StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it == metadata.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } auto& cache = it->second; @@ -1670,9 +1665,10 @@ StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, Part Status MetaClient::checkPartExistInCache(const HostAddr& host, GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end()) { for (auto& pId : partsIt->second) { @@ -1689,9 +1685,10 @@ Status MetaClient::checkPartExistInCache(const HostAddr& host, } Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spaceId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end() && !partsIt->second.empty()) { return Status::OK(); @@ -1703,9 +1700,10 @@ Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spa } StatusOr MetaClient::partsNum(GraphSpaceID spaceId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it == metadata.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } return it->second->partsAlloc_.size(); @@ -2096,9 +2094,10 @@ StatusOr MetaClient::getSpaceVidLen(const GraphSpaceID& spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -2114,9 +2113,10 @@ StatusOr MetaClient::getSpaceVidType(const GraphSpac if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -2135,9 +2135,10 @@ StatusOr MetaClient::getSpaceDesc(const GraphSpaceID& space) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(space); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(space); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << space << " not found!"; return Status::Error("Space %d not found", space); } @@ -2158,9 +2159,10 @@ StatusOr> MetaClient::getTagSchemaFr if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt != metadata.localCache_.end()) { auto tagIt = spaceIt->second->tagSchemas_.find(tagID); if (tagIt != spaceIt->second->tagSchemas_.end() && !tagIt->second.empty()) { size_t vNum = tagIt->second.size(); @@ -2178,9 +2180,10 @@ StatusOr> MetaClient::getEdgeSchemaF if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt != metadata.localCache_.end()) { auto edgeIt = spaceIt->second->edgeSchemas_.find(edgeType); if (edgeIt != spaceIt->second->edgeSchemas_.end() && !edgeIt->second.empty()) { size_t vNum = edgeIt->second.size(); @@ -2197,9 +2200,10 @@ StatusOr MetaClient::getAllVerTagSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->tagSchemas_; @@ -2209,9 +2213,10 @@ StatusOr MetaClient::getAllLatestVerTagSchema(const GraphSpaceID& spa if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } TagSchema tagsSchema; @@ -2227,9 +2232,10 @@ StatusOr MetaClient::getAllVerEdgeSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->edgeSchemas_; @@ -2239,9 +2245,10 @@ StatusOr MetaClient::getAllLatestVerEdgeSchemaFromCache(const GraphS if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } EdgeSchema edgesSchema; @@ -2329,9 +2336,10 @@ StatusOr> MetaClient::getTagIndexFromCache(Grap return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2366,9 +2374,10 @@ StatusOr> MetaClient::getEdgeIndexFromCache(Gra return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2403,9 +2412,10 @@ StatusOr>> MetaClient::getTagIndexe return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2426,9 +2436,10 @@ StatusOr>> MetaClient::getEdgeIndex return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2502,9 +2513,10 @@ std::vector MetaClient::getRolesByUserFromCache(const std::strin if (!ready_) { return std::vector(0); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userRolesMap_.find(user); - if (iter == threadLocalInfo.userRolesMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userRolesMap_.find(user); + if (iter == metadata.userRolesMap_.end()) { return std::vector(0); } return iter->second; @@ -2515,14 +2527,12 @@ Status MetaClient::authCheckFromCache(const std::string& account, const std::str if (!ready_) { return Status::Error("Meta Service not ready"); } - - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - // Check user existence - auto iter = threadLocalInfo.userPasswordMap_.find(account); - if (iter == threadLocalInfo.userPasswordMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userPasswordMap_.find(account); + if (iter == metadata.userPasswordMap_.end()) { return Status::Error("User not exist"); } - auto lockedSince = userLoginLockTime_[account]; auto passwordAttemtRemain = userPasswordAttemptsRemain_[account]; @@ -2584,9 +2594,10 @@ Status MetaClient::checkIpWhitelistFromCache(const std::string& account, if (!ready_) { return Status::Error("Meta service is not ready"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userIpWhitelistMap_.find(account); - if (iter == threadLocalInfo.userIpWhitelistMap_.end() || iter->second.empty()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userIpWhitelistMap_.find(account); + if (iter == metadata.userIpWhitelistMap_.end() || iter->second.empty()) { return Status::OK(); } return iter->second.find(clientIp) != iter->second.end() @@ -2598,18 +2609,20 @@ bool MetaClient::checkShadowAccountFromCache(const std::string& account) { if (!ready_) { return false; } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userPasswordMap_.find(account); - if (iter != threadLocalInfo.userPasswordMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userPasswordMap_.find(account); + if (iter != metadata.userPasswordMap_.end()) { return true; } return false; } StatusOr MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceInfo = threadLocalInfo.localCache_.find(spaceId); - if (spaceInfo == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceInfo = metadata.localCache_.find(spaceId); + if (spaceInfo == metadata.localCache_.end()) { return Status::Error("Term not found!"); } @@ -2626,8 +2639,9 @@ StatusOr> MetaClient::getStorageHosts() { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return threadLocalInfo.storageHosts_; + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return metadata.storageHosts_; } StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& space, @@ -2635,9 +2649,10 @@ StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); - if (it == threadLocalInfo.spaceNewestTagVerMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); + if (it == metadata.spaceNewestTagVerMap_.end()) { return Status::TagNotFound(); } return it->second; @@ -2648,9 +2663,10 @@ StatusOr MetaClient::getLatestEdgeVersionFromCache(const GraphSpaceID if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); - if (it == threadLocalInfo.spaceNewestEdgeVerMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); + if (it == metadata.spaceNewestEdgeVerMap_.end()) { return Status::EdgeNotFound(); } return it->second; @@ -3211,9 +3227,10 @@ MetaClient::getListenersBySpaceHostFromCache(GraphSpaceID spaceId, const HostAdd if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3231,9 +3248,10 @@ MetaClient::getMetaListenerInfoFromCache(HostAddr host) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.metaListeners_.find(host); - if (it == threadLocalInfo.metaListeners_.end()) { + folly::rcu_reader guard; + auto& metadata = *metadata_.load(); + auto it = metadata.metaListeners_.find(host); + if (it == metadata.metaListeners_.end()) { VLOG(3) << "Meta listener not found!"; return Status::ListenerNotFound(); } @@ -3245,10 +3263,10 @@ StatusOr MetaClient::getMetaListenerDrainerOnSpaceFromC if (!ready_) { return Status::Error("Not ready!"); } - - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(space); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(space); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << space << " not found!"; return Status::SpaceNotFound(); } @@ -3260,8 +3278,9 @@ StatusOr MetaClient::getListenersByHostFromCache(const HostAddr& h if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return doGetListenersMap(host, threadLocalInfo.localCache_); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return doGetListenersMap(host, metadata.localCache_); } ListenersMap MetaClient::doGetListenersMap(const HostAddr& host, const LocalCache& localCache) { @@ -3297,9 +3316,10 @@ StatusOr MetaClient::getListenerHostsBySpacePartType(GraphSpaceID spac if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3318,9 +3338,10 @@ StatusOr> MetaClient::getListenerHostTypeBySpace if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3457,8 +3478,9 @@ void MetaClient::updateNestedGflags(const std::unordered_map optionMap.emplace(value.first, value.second.toString()); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - for (const auto& spaceEntry : threadLocalInfo.localCache_) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + for (const auto& spaceEntry : metadata.localCache_) { listener_->onSpaceOptionUpdated(spaceEntry.first, optionMap); } } @@ -3745,11 +3767,11 @@ StatusOr> MetaClient::getServiceClientsFromCach if (!ready_) { return Status::Error("Not ready!"); } - - folly::RWSpinLock::ReadHolder holder(localCacheLock_); + folly::rcu_reader guard; + auto& metadata = *metadata_.load(); if (type == cpp2::ExternalServiceType::ELASTICSEARCH) { - auto sIter = serviceClientList_.find(type); - if (sIter != serviceClientList_.end()) { + auto sIter = metadata.serviceClientList_.find(type); + if (sIter != metadata.serviceClientList_.end()) { return sIter->second; } } @@ -3817,10 +3839,10 @@ StatusOr MetaClient::getDrainerClientFromCache(GraphSpa if (!ready_) { return Status::Error("Not ready!"); } - - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3837,10 +3859,10 @@ StatusOr> MetaClient::getDrainerFromCache(GraphSp if (!ready_) { return Status::Error("Not ready!"); } - - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3908,8 +3930,9 @@ StatusOr> MetaClient::getFTIndexe if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return threadLocalInfo.fulltextIndexMap_; + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return metadata.fulltextIndexMap_; } StatusOr> MetaClient::getFTIndexBySpaceFromCache( @@ -3917,9 +3940,10 @@ StatusOr> MetaClient::getFTIndexB if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); std::unordered_map indexes; - for (const auto& it : threadLocalInfo.fulltextIndexMap_) { + for (const auto& it : metadata.fulltextIndexMap_) { if (it.second.get_space_id() == spaceId) { indexes[it.first] = it.second; } @@ -3932,8 +3956,9 @@ StatusOr> MetaClient::getFTIndexBySpaceSch if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - for (auto& it : threadLocalInfo.fulltextIndexMap_) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + for (auto& it : metadata.fulltextIndexMap_) { auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type ? it.second.get_depend_schema().get_edge_type() : it.second.get_depend_schema().get_tag_id(); @@ -3949,12 +3974,13 @@ StatusOr MetaClient::getFTIndexByNameFromCache(GraphSpaceID space if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - if (threadLocalInfo.fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && - threadLocalInfo.fulltextIndexMap_.at(name).get_space_id() != spaceId) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + if (metadata.fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && + metadata.fulltextIndexMap_.at(name).get_space_id() != spaceId) { return Status::IndexNotFound(); } - return threadLocalInfo.fulltextIndexMap_.at(name); + return metadata.fulltextIndexMap_.at(name); } folly::Future> MetaClient::createSession( @@ -4106,22 +4132,16 @@ bool MetaClient::loadSessions() { LOG(ERROR) << "List sessions failed, status:" << session_list.status(); return false; } - SessionMap* oldSessionMap = sessionMap_.load(); - SessionMap* newSessionMap = new SessionMap(*oldSessionMap); - auto oldKilledPlan = killedPlans_.load(); - auto newKilledPlan = new folly::F14FastSet>(*oldKilledPlan); + sessionMap_.clear(); + killedPlans_.clear(); for (auto& session : session_list.value().get_sessions()) { - (*newSessionMap)[session.get_session_id()] = session; + sessionMap_[session.get_session_id()] = session; for (auto& query : session.get_queries()) { if (query.second.get_status() == cpp2::QueryStatus::KILLING) { - newKilledPlan->insert({session.get_session_id(), query.first}); + killedPlans_.insert({session.get_session_id(), query.first}); } } } - sessionMap_.store(newSessionMap); - killedPlans_.store(newKilledPlan); - folly::rcu_retire(oldKilledPlan); - folly::rcu_retire(oldSessionMap); return true; } @@ -4130,9 +4150,9 @@ StatusOr MetaClient::getSessionFromCache(const nebula::SessionID& return Status::Error("Not ready!"); } folly::rcu_reader guard; - auto session_map = sessionMap_.load(); - auto it = session_map->find(session_id); - if (it != session_map->end()) { + auto& sessionMap = metadata_.load()->sessionMap_; + auto it = sessionMap.find(session_id); + if (it != sessionMap.end()) { return it->second; } return Status::SessionNotFound(); @@ -4146,7 +4166,7 @@ bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId) return false; } folly::rcu_reader guard; - return killedPlans_.load()->count({sessionId, planId}); + return metadata_.load()->killedPlans_.count({sessionId, planId}); } Status MetaClient::verifyVersion() { @@ -4196,10 +4216,10 @@ bool MetaClient::currentSpaceReadOnly(GraphSpaceID spaceId) { if (!ready_) { return false; } - - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return false; } diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index dfa7d76db6b..b06854860df 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -896,7 +896,8 @@ class MetaClient { // Only report dir info once when started bool dirInfoReported_ = false; - struct ThreadLocalInfo { + + struct MetaData { int64_t localLastUpdateTime_{-2}; LocalCache localCache_; SpaceNameIdMap spaceIndexByName_; @@ -914,9 +915,11 @@ class MetaClient { FTIndexMap fulltextIndexMap_; UserPasswordMap userPasswordMap_; UserIpWhitelistMap userIpWhitelistMap_; - }; + SessionMap sessionMap_; + folly::F14FastSet> killedPlans_; - const ThreadLocalInfo& getThreadLocalInfo(); + ServiceClientsList serviceClientList_; + }; void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col, ObjectPool* pool); @@ -948,7 +951,6 @@ class MetaClient { ServiceClientsList serviceClientList_; FTIndexMap fulltextIndexMap_; - mutable folly::RWSpinLock localCacheLock_; // The listener_ is the NebulaStore MetaChangedListener* listener_{nullptr}; // The lock used to protect listener_ @@ -966,8 +968,9 @@ class MetaClient { MetaClientOptions options_; std::vector storageHosts_; int64_t heartbeatTime_; - std::atomic sessionMap_; - std::atomic>*> killedPlans_; + SessionMap sessionMap_; + folly::F14FastSet> killedPlans_; + std::atomic metadata_; }; } // namespace meta diff --git a/src/storage/test/ChainTestUtils.h b/src/storage/test/ChainTestUtils.h index d94f30b2a74..0fd04ca00ee 100644 --- a/src/storage/test/ChainTestUtils.h +++ b/src/storage/test/ChainTestUtils.h @@ -226,14 +226,14 @@ class MetaClientTestUpdater { static void addLocalCache(meta::MetaClient& mClient, GraphSpaceID spaceId, std::shared_ptr spInfoCache) { - mClient.localCache_[spaceId] = spInfoCache; + mClient.metadata_.load()->localCache_[spaceId] = spInfoCache; } static meta::SpaceInfoCache* getLocalCache(meta::MetaClient* mClient, GraphSpaceID spaceId) { - if (mClient->localCache_.count(spaceId) == 0) { + if (mClient->metadata_.load()->localCache_.count(spaceId) == 0) { return nullptr; } - return mClient->localCache_[spaceId].get(); + return mClient->metadata_.load()->localCache_[spaceId].get(); } static void addPartTerm(meta::MetaClient* mClient, diff --git a/src/storage/test/KillQueryTest.cpp b/src/storage/test/KillQueryTest.cpp index 815ddad74a5..8516244c5bb 100644 --- a/src/storage/test/KillQueryTest.cpp +++ b/src/storage/test/KillQueryTest.cpp @@ -19,7 +19,7 @@ class KillQueryMetaWrapper { public: explicit KillQueryMetaWrapper(MetaClient* client) : client_(client) {} void killQuery(SessionID session_id, ExecutionPlanID plan_id) { - client_->killedPlans_.load()->emplace(session_id, plan_id); + client_->metadata_.load()->killedPlans_.emplace(session_id, plan_id); } private: