Skip to content

Commit

Permalink
increase heartbeat lock to avoid blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Feb 25, 2022
1 parent 1d98cbd commit 2041892
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 9 deletions.
10 changes: 10 additions & 0 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ class LockUtils {
static folly::SharedMutex lock;
return lock;
}

static folly::SharedMutex& snapshotLock() {
static folly::SharedMutex snapshotLock;
return snapshotLock;
}

static folly::SharedMutex& sessionLock() {
static folly::SharedMutex sessionLock;
return sessionLock;
}
};

} // namespace meta
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/CreateBackupProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) {
return;
}

folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::snapshotLock());
// get active storage host list
auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_);
if (!nebula::ok(activeHostsRet)) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/CreateSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
}

auto snapshot = folly::sformat("SNAPSHOT_{}", MetaKeyUtils::genTimestampStr());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::snapshotLock());

auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_);
if (!nebula::ok(activeHostsRet)) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/DropSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace meta {

void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) {
auto& snapshot = req.get_name();
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::snapshotLock());

// Check snapshot is exists
auto key = MetaKeyUtils::snapshotKey(snapshot);
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/index/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
return;
}

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = getIndexID(space, indexName);
if (nebula::ok(ret)) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/index/CreateTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) {
return;
}

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = getIndexID(space, indexName);
if (nebula::ok(ret)) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/parts/DropSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace nebula {
namespace meta {

void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
const auto& spaceName = req.get_space_name();
auto spaceRet = getSpaceId(spaceName);
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/schema/AlterEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
CHECK_SPACE_ID_AND_RETURN(spaceId);
const auto& edgeName = req.get_edge_name();

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = getEdgeType(spaceId, edgeName);
if (!nebula::ok(ret)) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/schema/AlterTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) {
CHECK_SPACE_ID_AND_RETURN(spaceId);
const auto& tagName = req.get_tag_name();

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = getTagId(spaceId, tagName);
if (!nebula::ok(ret)) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/schema/DropEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
const auto& edgeName = req.get_edge_name();

Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/schema/DropTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
const auto& tagName = req.get_tag_name();

Expand Down
12 changes: 6 additions & 6 deletions src/meta/processors/session/SessionManagerProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace nebula {
namespace meta {

void CreateSessionProcessor::process(const cpp2::CreateSessionReq& req) {
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock());
const auto& user = req.get_user();
auto ret = userExist(user);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down Expand Up @@ -42,7 +42,7 @@ void CreateSessionProcessor::process(const cpp2::CreateSessionReq& req) {
}

void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) {
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock());
std::vector<kvstore::KV> data;
std::unordered_map<nebula::SessionID,
std::unordered_map<nebula::ExecutionPlanID, cpp2::QueryDesc>>
Expand Down Expand Up @@ -108,7 +108,7 @@ void UpdateSessionsProcessor::process(const cpp2::UpdateSessionsReq& req) {
}

void ListSessionsProcessor::process(const cpp2::ListSessionsReq&) {
folly::SharedMutex::ReadHolder holder(LockUtils::lock());
folly::SharedMutex::ReadHolder holder(LockUtils::sessionLock());
auto& prefix = MetaKeyUtils::sessionPrefix();
auto ret = doPrefix(prefix);
if (!nebula::ok(ret)) {
Expand All @@ -134,7 +134,7 @@ void ListSessionsProcessor::process(const cpp2::ListSessionsReq&) {
}

void GetSessionProcessor::process(const cpp2::GetSessionReq& req) {
folly::SharedMutex::ReadHolder holder(LockUtils::lock());
folly::SharedMutex::ReadHolder holder(LockUtils::sessionLock());
auto sessionId = req.get_session_id();
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
auto ret = doGet(sessionKey);
Expand All @@ -156,7 +156,7 @@ void GetSessionProcessor::process(const cpp2::GetSessionReq& req) {
}

void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) {
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock());
auto sessionId = req.get_session_id();
auto sessionKey = MetaKeyUtils::sessionKey(sessionId);
auto ret = doGet(sessionKey);
Expand All @@ -176,7 +176,7 @@ void RemoveSessionProcessor::process(const cpp2::RemoveSessionReq& req) {
}

void KillQueryProcessor::process(const cpp2::KillQueryReq& req) {
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::sessionLock());
auto& killQueries = req.get_kill_queries();

std::vector<kvstore::KV> data;
Expand Down

0 comments on commit 2041892

Please sign in to comment.