diff --git a/src/clients/meta/MetaClient.cpp b/src/clients/meta/MetaClient.cpp index 32c5ee7c15b..db75a884c8f 100644 --- a/src/clients/meta/MetaClient.cpp +++ b/src/clients/meta/MetaClient.cpp @@ -64,14 +64,15 @@ DEFINE_validator(failed_login_attempts, &ValidateFailedLoginAttempts); namespace nebula { namespace meta { +Indexes buildIndexes(std::vector indexItemVec); + MetaClient::MetaClient(std::shared_ptr ioThreadPool, std::vector addrs, const MetaClientOptions& options) : ioThreadPool_(ioThreadPool), addrs_(std::move(addrs)), options_(options), - sessionMap_(new SessionMap{}), - killedPlans_(new folly::F14FastSet>{}) { + metadata_(new MetaData()) { CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required"; CHECK(!addrs_.empty()) << "No meta server address is specified or can be solved. Meta server is required"; @@ -88,8 +89,7 @@ MetaClient::MetaClient(std::shared_ptr ioThreadPool MetaClient::~MetaClient() { notifyStop(); stop(); - delete sessionMap_.load(); - delete killedPlans_.load(); + delete metadata_.load(); VLOG(3) << "~MetaClient"; } @@ -194,35 +194,35 @@ bool MetaClient::loadUsersAndRoles() { userPasswordMap[user.first] = user.second; userNameList.emplace(user.first); } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - userRolesMap_ = std::move(userRolesMap); - userPasswordMap_ = std::move(userPasswordMap); - - // Remove expired users from cache - auto removeExpiredUser = [&](folly::ConcurrentHashMap& userMap, - const std::unordered_set& userList) { - for (auto& ele : userMap) { - if (userList.count(ele.first) == 0) { - userMap.erase(ele.first); - } - } - }; - removeExpiredUser(userPasswordAttemptsRemain_, userNameList); - removeExpiredUser(userLoginLockTime_, userNameList); - - // This method is called periodically by the heartbeat thread, but we don't want to reset the - // failed login attempts every time. - for (const auto& user : userNameList) { - // If the user is not in the map, insert value with the default value - // Do nothing if the account is already in the map - if (userPasswordAttemptsRemain_.find(user) == userPasswordAttemptsRemain_.end()) { - userPasswordAttemptsRemain_.insert(user, FLAGS_failed_login_attempts); - } - if (userLoginLockTime_.find(user) == userLoginLockTime_.end()) { - userLoginLockTime_.insert(user, 0); + + userRolesMap_ = std::move(userRolesMap); + userPasswordMap_ = std::move(userPasswordMap); + + // Remove expired users from cache + auto removeExpiredUser = [&](folly::ConcurrentHashMap& userMap, + const std::unordered_set& userList) { + for (auto iter = userMap.begin(); iter != userMap.end();) { + if (!userList.count(iter->first)) { + iter = userMap.erase(iter); + } else { + ++iter; } } + }; + removeExpiredUser(userPasswordAttemptsRemain_, userNameList); + removeExpiredUser(userLoginLockTime_, userNameList); + + // This method is called periodically by the heartbeat thread, but we don't want to reset the + // failed login attempts every time. + for (const auto& user : userNameList) { + // If the user is not in the map, insert value with the default value + // Do nothing if the account is already in the map + if (userPasswordAttemptsRemain_.find(user) == userPasswordAttemptsRemain_.end()) { + userPasswordAttemptsRemain_.insert(user, FLAGS_failed_login_attempts); + } + if (userLoginLockTime_.find(user) == userLoginLockTime_.end()) { + userLoginLockTime_.insert(user, 0); + } } return true; } @@ -344,7 +344,6 @@ bool MetaClient::loadData() { decltype(localCache_) oldCache; { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); oldCache = std::move(localCache_); localCache_ = std::move(cache); spaceIndexByName_ = std::move(spaceIndexByName); @@ -359,7 +358,37 @@ bool MetaClient::loadData() { } localDataLastUpdateTime_.store(metadLastUpdateTime_.load()); - + auto newMetaData = new MetaData(); + + for (auto& spaceInfo : localCache_) { + GraphSpaceID spaceId = spaceInfo.first; + std::shared_ptr info = spaceInfo.second; + std::shared_ptr infoDeepCopy = std::make_shared(*info); + infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_); + infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_); + infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_); + newMetaData->localCache_[spaceId] = infoDeepCopy; + } + newMetaData->spaceIndexByName_ = spaceIndexByName_; + newMetaData->spaceTagIndexByName_ = spaceTagIndexByName_; + newMetaData->spaceEdgeIndexByName_ = spaceEdgeIndexByName_; + newMetaData->spaceEdgeIndexByType_ = spaceEdgeIndexByType_; + newMetaData->spaceNewestTagVerMap_ = spaceNewestTagVerMap_; + newMetaData->spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_; + newMetaData->spaceTagIndexById_ = spaceTagIndexById_; + newMetaData->spaceAllEdgeMap_ = spaceAllEdgeMap_; + + newMetaData->userRolesMap_ = userRolesMap_; + newMetaData->storageHosts_ = storageHosts_; + newMetaData->fulltextIndexMap_ = fulltextIndexMap_; + newMetaData->userPasswordMap_ = userPasswordMap_; + newMetaData->sessionMap_ = std::move(sessionMap_); + newMetaData->killedPlans_ = std::move(killedPlans_); + newMetaData->serviceClientList_ = std::move(serviceClientList_); + auto oldMetaData = metadata_.load(); + metadata_.store(newMetaData); + folly::rcu_retire(oldMetaData); diff(oldCache, localCache_); listenerDiff(oldCache, localCache_); loadRemoteListeners(); @@ -513,7 +542,7 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId, return true; } -static Indexes buildIndexes(std::vector indexItemVec) { +Indexes buildIndexes(std::vector indexItemVec) { Indexes indexes; for (auto index : indexItemVec) { auto indexName = index.get_index_name(); @@ -582,10 +611,7 @@ bool MetaClient::loadGlobalServiceClients() { LOG(ERROR) << "List services failed, status:" << ret.status(); return false; } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - serviceClientList_ = std::move(ret).value(); - } + serviceClientList_ = std::move(ret).value(); return true; } @@ -595,53 +621,15 @@ bool MetaClient::loadFulltextIndexes() { LOG(ERROR) << "List fulltext indexes failed, status:" << ftRet.status(); return false; } - { - folly::RWSpinLock::WriteHolder holder(localCacheLock_); - fulltextIndexMap_ = std::move(ftRet).value(); - } + fulltextIndexMap_ = std::move(ftRet).value(); return true; } -const MetaClient::ThreadLocalInfo& MetaClient::getThreadLocalInfo() { - ThreadLocalInfo& threadLocalInfo = folly::SingletonThreadLocal::get(); - - if (threadLocalInfo.localLastUpdateTime_ < localDataLastUpdateTime_) { - threadLocalInfo.localLastUpdateTime_ = localDataLastUpdateTime_; - - folly::RWSpinLock::ReadHolder holder(localCacheLock_); - for (auto& spaceInfo : localCache_) { - GraphSpaceID spaceId = spaceInfo.first; - std::shared_ptr info = spaceInfo.second; - std::shared_ptr infoDeepCopy = std::make_shared(*info); - infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_); - infoDeepCopy->edgeSchemas_ = - buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_); - infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_); - infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_); - threadLocalInfo.localCache_[spaceId] = infoDeepCopy; - } - threadLocalInfo.spaceIndexByName_ = spaceIndexByName_; - threadLocalInfo.spaceTagIndexByName_ = spaceTagIndexByName_; - threadLocalInfo.spaceEdgeIndexByName_ = spaceEdgeIndexByName_; - threadLocalInfo.spaceEdgeIndexByType_ = spaceEdgeIndexByType_; - threadLocalInfo.spaceNewestTagVerMap_ = spaceNewestTagVerMap_; - threadLocalInfo.spaceNewestEdgeVerMap_ = spaceNewestEdgeVerMap_; - threadLocalInfo.spaceTagIndexById_ = spaceTagIndexById_; - threadLocalInfo.spaceAllEdgeMap_ = spaceAllEdgeMap_; - - threadLocalInfo.userRolesMap_ = userRolesMap_; - threadLocalInfo.storageHosts_ = storageHosts_; - threadLocalInfo.fulltextIndexMap_ = fulltextIndexMap_; - threadLocalInfo.userPasswordMap_ = userPasswordMap_; - } - - return threadLocalInfo; -} - Status MetaClient::checkTagIndexed(GraphSpaceID spaceId, IndexID indexID) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto indexIt = it->second->tagIndexes_.find(indexID); if (indexIt != it->second->tagIndexes_.end()) { return Status::OK(); @@ -653,9 +641,10 @@ Status MetaClient::checkTagIndexed(GraphSpaceID spaceId, IndexID indexID) { } Status MetaClient::checkEdgeIndexed(GraphSpaceID space, IndexID indexID) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(space); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(space); + if (it != metadata.localCache_.end()) { auto indexIt = it->second->edgeIndexes_.find(indexID); if (indexIt != it->second->edgeIndexes_.end()) { return Status::OK(); @@ -1335,9 +1324,10 @@ StatusOr MetaClient::getSpaceIdByNameFromCache(const std::string& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceIndexByName_.find(name); - if (it != threadLocalInfo.spaceIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceIndexByName_.find(name); + if (it != metadata.spaceIndexByName_.end()) { return it->second; } return Status::SpaceNotFound(); @@ -1347,9 +1337,10 @@ StatusOr MetaClient::getSpaceNameByIdFromCache(GraphSpaceID spaceId if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -1361,9 +1352,10 @@ StatusOr MetaClient::getTagIDByNameFromCache(const GraphSpaceID& space, if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceTagIndexByName_.find(std::make_pair(space, name)); - if (it == threadLocalInfo.spaceTagIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceTagIndexByName_.find(std::make_pair(space, name)); + if (it == metadata.spaceTagIndexByName_.end()) { return Status::Error("TagName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1374,9 +1366,10 @@ StatusOr MetaClient::getTagNameByIdFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceTagIndexById_.find(std::make_pair(space, tagId)); - if (it == threadLocalInfo.spaceTagIndexById_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceTagIndexById_.find(std::make_pair(space, tagId)); + if (it == metadata.spaceTagIndexById_.end()) { return Status::Error("TagID `%d' is nonexistent", tagId); } return it->second; @@ -1387,9 +1380,10 @@ StatusOr MetaClient::getEdgeTypeByNameFromCache(const GraphSpaceID& sp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceEdgeIndexByName_.find(std::make_pair(space, name)); - if (it == threadLocalInfo.spaceEdgeIndexByName_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceEdgeIndexByName_.find(std::make_pair(space, name)); + if (it == metadata.spaceEdgeIndexByName_.end()) { return Status::Error("EdgeName `%s' is nonexistent", name.c_str()); } return it->second; @@ -1400,9 +1394,10 @@ StatusOr MetaClient::getEdgeNameByTypeFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); - if (it == threadLocalInfo.spaceEdgeIndexByType_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceEdgeIndexByType_.find(std::make_pair(space, edgeType)); + if (it == metadata.spaceEdgeIndexByType_.end()) { return Status::Error("EdgeType `%d' is nonexistent", edgeType); } return it->second; @@ -1412,9 +1407,10 @@ StatusOr> MetaClient::getAllEdgeFromCache(const GraphSp if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceAllEdgeMap_.find(space); - if (it == threadLocalInfo.spaceAllEdgeMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceAllEdgeMap_.find(space); + if (it == metadata.spaceAllEdgeMap_.end()) { return Status::Error("SpaceId `%d' is nonexistent", space); } return it->second; @@ -1549,14 +1545,16 @@ folly::Future> MetaClient::removeRange(std::string segment, } PartsMap MetaClient::getPartsMapFromCache(const HostAddr& host) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return doGetPartsMap(host, threadLocalInfo.localCache_); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return doGetPartsMap(host, metadata.localCache_); } StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it == metadata.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } auto& cache = it->second; @@ -1574,9 +1572,10 @@ StatusOr MetaClient::getPartHostsFromCache(GraphSpaceID spaceId, Part Status MetaClient::checkPartExistInCache(const HostAddr& host, GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end()) { for (auto& pId : partsIt->second) { @@ -1593,9 +1592,10 @@ Status MetaClient::checkPartExistInCache(const HostAddr& host, } Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spaceId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it != metadata.localCache_.end()) { auto partsIt = it->second->partsOnHost_.find(host); if (partsIt != it->second->partsOnHost_.end() && !partsIt->second.empty()) { return Status::OK(); @@ -1607,9 +1607,10 @@ Status MetaClient::checkSpaceExistInCache(const HostAddr& host, GraphSpaceID spa } StatusOr MetaClient::partsNum(GraphSpaceID spaceId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.localCache_.find(spaceId); - if (it == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.localCache_.find(spaceId); + if (it == metadata.localCache_.end()) { return Status::Error("Space not found, spaceid: %d", spaceId); } return it->second->partsAlloc_.size(); @@ -2000,9 +2001,10 @@ StatusOr MetaClient::getSpaceVidLen(const GraphSpaceID& spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -2018,9 +2020,10 @@ StatusOr MetaClient::getSpaceVidType(const GraphSpac if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << spaceId << " not found!"; return Status::Error("Space %d not found", spaceId); } @@ -2039,9 +2042,10 @@ StatusOr MetaClient::getSpaceDesc(const GraphSpaceID& space) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(space); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(space); + if (spaceIt == metadata.localCache_.end()) { LOG(ERROR) << "Space " << space << " not found!"; return Status::Error("Space %d not found", space); } @@ -2062,9 +2066,10 @@ StatusOr> MetaClient::getTagSchemaFr if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt != metadata.localCache_.end()) { auto tagIt = spaceIt->second->tagSchemas_.find(tagID); if (tagIt != spaceIt->second->tagSchemas_.end() && !tagIt->second.empty()) { size_t vNum = tagIt->second.size(); @@ -2082,9 +2087,10 @@ StatusOr> MetaClient::getEdgeSchemaF if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt != threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt != metadata.localCache_.end()) { auto edgeIt = spaceIt->second->edgeSchemas_.find(edgeType); if (edgeIt != spaceIt->second->edgeSchemas_.end() && !edgeIt->second.empty()) { size_t vNum = edgeIt->second.size(); @@ -2101,9 +2107,10 @@ StatusOr MetaClient::getAllVerTagSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->tagSchemas_; @@ -2113,9 +2120,10 @@ StatusOr MetaClient::getAllLatestVerTagSchema(const GraphSpaceID& spa if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } TagSchema tagsSchema; @@ -2131,9 +2139,10 @@ StatusOr MetaClient::getAllVerEdgeSchema(GraphSpaceID spaceId) { if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } return iter->second->edgeSchemas_; @@ -2143,9 +2152,10 @@ StatusOr MetaClient::getAllLatestVerEdgeSchemaFromCache(const GraphS if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.localCache_.find(spaceId); - if (iter == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.localCache_.find(spaceId); + if (iter == metadata.localCache_.end()) { return Status::Error("Space %d not found", spaceId); } EdgeSchema edgesSchema; @@ -2233,9 +2243,10 @@ StatusOr> MetaClient::getTagIndexFromCache(Grap return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2270,9 +2281,10 @@ StatusOr> MetaClient::getEdgeIndexFromCache(Gra return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2307,9 +2319,10 @@ StatusOr>> MetaClient::getTagIndexe return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2330,9 +2343,10 @@ StatusOr>> MetaClient::getEdgeIndex return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } else { @@ -2406,9 +2420,10 @@ std::vector MetaClient::getRolesByUserFromCache(const std::strin if (!ready_) { return std::vector(0); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userRolesMap_.find(user); - if (iter == threadLocalInfo.userRolesMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userRolesMap_.find(user); + if (iter == metadata.userRolesMap_.end()) { return std::vector(0); } return iter->second; @@ -2419,14 +2434,12 @@ Status MetaClient::authCheckFromCache(const std::string& account, const std::str if (!ready_) { return Status::Error("Meta Service not ready"); } - - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - // Check user existence - auto iter = threadLocalInfo.userPasswordMap_.find(account); - if (iter == threadLocalInfo.userPasswordMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userPasswordMap_.find(account); + if (iter == metadata.userPasswordMap_.end()) { return Status::Error("User not exist"); } - auto lockedSince = userLoginLockTime_[account]; auto passwordAttemtRemain = userPasswordAttemptsRemain_[account]; @@ -2487,18 +2500,20 @@ bool MetaClient::checkShadowAccountFromCache(const std::string& account) { if (!ready_) { return false; } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto iter = threadLocalInfo.userPasswordMap_.find(account); - if (iter != threadLocalInfo.userPasswordMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto iter = metadata.userPasswordMap_.find(account); + if (iter != metadata.userPasswordMap_.end()) { return true; } return false; } StatusOr MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID partId) { - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceInfo = threadLocalInfo.localCache_.find(spaceId); - if (spaceInfo == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceInfo = metadata.localCache_.find(spaceId); + if (spaceInfo == metadata.localCache_.end()) { return Status::Error("Term not found!"); } @@ -2515,8 +2530,9 @@ StatusOr> MetaClient::getStorageHosts() { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return threadLocalInfo.storageHosts_; + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return metadata.storageHosts_; } StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& space, @@ -2524,9 +2540,10 @@ StatusOr MetaClient::getLatestTagVersionFromCache(const GraphSpaceID& if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); - if (it == threadLocalInfo.spaceNewestTagVerMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceNewestTagVerMap_.find(std::make_pair(space, tagId)); + if (it == metadata.spaceNewestTagVerMap_.end()) { return Status::TagNotFound(); } return it->second; @@ -2537,9 +2554,10 @@ StatusOr MetaClient::getLatestEdgeVersionFromCache(const GraphSpaceID if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto it = threadLocalInfo.spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); - if (it == threadLocalInfo.spaceNewestEdgeVerMap_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto it = metadata.spaceNewestEdgeVerMap_.find(std::make_pair(space, edgeType)); + if (it == metadata.spaceNewestEdgeVerMap_.end()) { return Status::EdgeNotFound(); } return it->second; @@ -2993,9 +3011,10 @@ MetaClient::getListenersBySpaceHostFromCache(GraphSpaceID spaceId, const HostAdd if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3012,8 +3031,9 @@ StatusOr MetaClient::getListenersByHostFromCache(const HostAddr& h if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return doGetListenersMap(host, threadLocalInfo.localCache_); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return doGetListenersMap(host, metadata.localCache_); } ListenersMap MetaClient::doGetListenersMap(const HostAddr& host, const LocalCache& localCache) { @@ -3049,9 +3069,10 @@ StatusOr MetaClient::getListenerHostsBySpacePartType(GraphSpaceID spac if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3070,9 +3091,10 @@ StatusOr> MetaClient::getListenerHostTypeBySpace if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - auto spaceIt = threadLocalInfo.localCache_.find(spaceId); - if (spaceIt == threadLocalInfo.localCache_.end()) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + auto spaceIt = metadata.localCache_.find(spaceId); + if (spaceIt == metadata.localCache_.end()) { VLOG(3) << "Space " << spaceId << " not found!"; return Status::SpaceNotFound(); } @@ -3161,8 +3183,9 @@ void MetaClient::updateNestedGflags(const std::unordered_map optionMap.emplace(value.first, value.second.toString()); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - for (const auto& spaceEntry : threadLocalInfo.localCache_) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + for (const auto& spaceEntry : metadata.localCache_) { listener_->onSpaceOptionUpdated(spaceEntry.first, optionMap); } } @@ -3449,11 +3472,11 @@ StatusOr> MetaClient::getServiceClientsFromCach if (!ready_) { return Status::Error("Not ready!"); } - - folly::RWSpinLock::ReadHolder holder(localCacheLock_); + folly::rcu_reader guard; + auto& metadata = *metadata_.load(); if (type == cpp2::ExternalServiceType::ELASTICSEARCH) { - auto sIter = serviceClientList_.find(type); - if (sIter != serviceClientList_.end()) { + auto sIter = metadata.serviceClientList_.find(type); + if (sIter != metadata.serviceClientList_.end()) { return sIter->second; } } @@ -3515,8 +3538,9 @@ StatusOr> MetaClient::getFTIndexe if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - return threadLocalInfo.fulltextIndexMap_; + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + return metadata.fulltextIndexMap_; } StatusOr> MetaClient::getFTIndexBySpaceFromCache( @@ -3524,9 +3548,10 @@ StatusOr> MetaClient::getFTIndexB if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); std::unordered_map indexes; - for (const auto& it : threadLocalInfo.fulltextIndexMap_) { + for (const auto& it : metadata.fulltextIndexMap_) { if (it.second.get_space_id() == spaceId) { indexes[it.first] = it.second; } @@ -3539,8 +3564,9 @@ StatusOr> MetaClient::getFTIndexBySpaceSch if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - for (auto& it : threadLocalInfo.fulltextIndexMap_) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + for (auto& it : metadata.fulltextIndexMap_) { auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type ? it.second.get_depend_schema().get_edge_type() : it.second.get_depend_schema().get_tag_id(); @@ -3556,12 +3582,13 @@ StatusOr MetaClient::getFTIndexByNameFromCache(GraphSpaceID space if (!ready_) { return Status::Error("Not ready!"); } - const ThreadLocalInfo& threadLocalInfo = getThreadLocalInfo(); - if (threadLocalInfo.fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && - threadLocalInfo.fulltextIndexMap_.at(name).get_space_id() != spaceId) { + folly::rcu_reader guard; + const auto& metadata = *metadata_.load(); + if (metadata.fulltextIndexMap_.find(name) != fulltextIndexMap_.end() && + metadata.fulltextIndexMap_.at(name).get_space_id() != spaceId) { return Status::IndexNotFound(); } - return threadLocalInfo.fulltextIndexMap_.at(name); + return metadata.fulltextIndexMap_.at(name); } folly::Future> MetaClient::createSession( @@ -3713,22 +3740,16 @@ bool MetaClient::loadSessions() { LOG(ERROR) << "List sessions failed, status:" << session_list.status(); return false; } - SessionMap* oldSessionMap = sessionMap_.load(); - SessionMap* newSessionMap = new SessionMap(*oldSessionMap); - auto oldKilledPlan = killedPlans_.load(); - auto newKilledPlan = new folly::F14FastSet>(*oldKilledPlan); + sessionMap_.clear(); + killedPlans_.clear(); for (auto& session : session_list.value().get_sessions()) { - (*newSessionMap)[session.get_session_id()] = session; + sessionMap_[session.get_session_id()] = session; for (auto& query : session.get_queries()) { if (query.second.get_status() == cpp2::QueryStatus::KILLING) { - newKilledPlan->insert({session.get_session_id(), query.first}); + killedPlans_.insert({session.get_session_id(), query.first}); } } } - sessionMap_.store(newSessionMap); - killedPlans_.store(newKilledPlan); - folly::rcu_retire(oldKilledPlan); - folly::rcu_retire(oldSessionMap); return true; } @@ -3737,9 +3758,9 @@ StatusOr MetaClient::getSessionFromCache(const nebula::SessionID& return Status::Error("Not ready!"); } folly::rcu_reader guard; - auto session_map = sessionMap_.load(); - auto it = session_map->find(session_id); - if (it != session_map->end()) { + auto& sessionMap = metadata_.load()->sessionMap_; + auto it = sessionMap.find(session_id); + if (it != sessionMap.end()) { return it->second; } return Status::SessionNotFound(); @@ -3753,7 +3774,7 @@ bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId) return false; } folly::rcu_reader guard; - return killedPlans_.load()->count({sessionId, planId}); + return metadata_.load()->killedPlans_.count({sessionId, planId}); } Status MetaClient::verifyVersion() { diff --git a/src/clients/meta/MetaClient.h b/src/clients/meta/MetaClient.h index e94b58d4c47..8ac7e42ff02 100644 --- a/src/clients/meta/MetaClient.h +++ b/src/clients/meta/MetaClient.h @@ -792,7 +792,8 @@ class MetaClient { // Only report dir info once when started bool dirInfoReported_ = false; - struct ThreadLocalInfo { + + struct MetaData { int64_t localLastUpdateTime_{-2}; LocalCache localCache_; SpaceNameIdMap spaceIndexByName_; @@ -808,9 +809,12 @@ class MetaClient { std::vector storageHosts_; FTIndexMap fulltextIndexMap_; UserPasswordMap userPasswordMap_; - }; - const ThreadLocalInfo& getThreadLocalInfo(); + SessionMap sessionMap_; + folly::F14FastSet> killedPlans_; + + ServiceClientsList serviceClientList_; + }; void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col, ObjectPool* pool); @@ -839,7 +843,6 @@ class MetaClient { ServiceClientsList serviceClientList_; FTIndexMap fulltextIndexMap_; - mutable folly::RWSpinLock localCacheLock_; // The listener_ is the NebulaStore MetaChangedListener* listener_{nullptr}; // The lock used to protect listener_ @@ -857,8 +860,9 @@ class MetaClient { MetaClientOptions options_; std::vector storageHosts_; int64_t heartbeatTime_; - std::atomic sessionMap_; - std::atomic>*> killedPlans_; + SessionMap sessionMap_; + folly::F14FastSet> killedPlans_; + std::atomic metadata_; }; } // namespace meta diff --git a/src/storage/test/ChainTestUtils.h b/src/storage/test/ChainTestUtils.h index d94f30b2a74..0fd04ca00ee 100644 --- a/src/storage/test/ChainTestUtils.h +++ b/src/storage/test/ChainTestUtils.h @@ -226,14 +226,14 @@ class MetaClientTestUpdater { static void addLocalCache(meta::MetaClient& mClient, GraphSpaceID spaceId, std::shared_ptr spInfoCache) { - mClient.localCache_[spaceId] = spInfoCache; + mClient.metadata_.load()->localCache_[spaceId] = spInfoCache; } static meta::SpaceInfoCache* getLocalCache(meta::MetaClient* mClient, GraphSpaceID spaceId) { - if (mClient->localCache_.count(spaceId) == 0) { + if (mClient->metadata_.load()->localCache_.count(spaceId) == 0) { return nullptr; } - return mClient->localCache_[spaceId].get(); + return mClient->metadata_.load()->localCache_[spaceId].get(); } static void addPartTerm(meta::MetaClient* mClient, diff --git a/src/storage/test/KillQueryTest.cpp b/src/storage/test/KillQueryTest.cpp index 815ddad74a5..8516244c5bb 100644 --- a/src/storage/test/KillQueryTest.cpp +++ b/src/storage/test/KillQueryTest.cpp @@ -19,7 +19,7 @@ class KillQueryMetaWrapper { public: explicit KillQueryMetaWrapper(MetaClient* client) : client_(client) {} void killQuery(SessionID session_id, ExecutionPlanID plan_id) { - client_->killedPlans_.load()->emplace(session_id, plan_id); + client_->metadata_.load()->killedPlans_.emplace(session_id, plan_id); } private: