Skip to content

Commit

Permalink
increase heartbeat lock to avoid blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Feb 24, 2022
1 parent 1d98cbd commit 8ef4b4b
Show file tree
Hide file tree
Showing 13 changed files with 22 additions and 5 deletions.
10 changes: 10 additions & 0 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ class LockUtils {
static folly::SharedMutex lock;
return lock;
}

static folly::SharedMutex& heartbeatLock() {
static folly::SharedMutex heartbeatLock;
return heartbeatLock;
}

static folly::SharedMutex& snapshotLock() {
static folly::SharedMutex snapshotLock;
return snapshotLock;
}
};

} // namespace meta
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/AgentHBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
HostAddr agentAddr((*req.host_ref()).host, (*req.host_ref()).port);
LOG(INFO) << "Receive heartbeat from " << agentAddr << ", role = AGENT";

folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::heartbeatLock());
nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED;
do {
// update agent host info
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/CreateBackupProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) {
return;
}

folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::snapshotLock());
// get active storage host list
auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_);
if (!nebula::ok(activeHostsRet)) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/CreateSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
}

auto snapshot = folly::sformat("SNAPSHOT_{}", MetaKeyUtils::genTimestampStr());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::snapshotLock());

auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_);
if (!nebula::ok(activeHostsRet)) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/DropSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace meta {

void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) {
auto& snapshot = req.get_name();
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::snapshotLock());

// Check snapshot is exists
auto key = MetaKeyUtils::snapshotKey(snapshot);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void HBProcessor::process(const cpp2::HBReq& req) {
auto role = req.get_role();
LOG(INFO) << "Receive heartbeat from " << host
<< ", role = " << apache::thrift::util::enumNameSafe(role);
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
folly::SharedMutex::WriteHolder holder(LockUtils::heartbeatLock());
if (role == cpp2::HostRole::STORAGE) {
if (!ActiveHostsMan::machineRegisted(kvstore_, host)) {
LOG(INFO) << "Machine " << host << " is not registed";
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/index/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
return;
}

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = getIndexID(space, indexName);
if (nebula::ok(ret)) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/index/CreateTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) {
return;
}

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = getIndexID(space, indexName);
if (nebula::ok(ret)) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/parts/DropSpaceProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace nebula {
namespace meta {

void DropSpaceProcessor::process(const cpp2::DropSpaceReq& req) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
const auto& spaceName = req.get_space_name();
auto spaceRet = getSpaceId(spaceName);
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/schema/AlterEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
CHECK_SPACE_ID_AND_RETURN(spaceId);
const auto& edgeName = req.get_edge_name();

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = getEdgeType(spaceId, edgeName);
if (!nebula::ok(ret)) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/schema/AlterTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) {
CHECK_SPACE_ID_AND_RETURN(spaceId);
const auto& tagName = req.get_tag_name();

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = getTagId(spaceId, tagName);
if (!nebula::ok(ret)) {
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/schema/DropEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
const auto& edgeName = req.get_edge_name();

Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/schema/DropTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
const auto& tagName = req.get_tag_name();

Expand Down

0 comments on commit 8ef4b4b

Please sign in to comment.