Skip to content

Commit

Permalink
fix lock
Browse files Browse the repository at this point in the history
  • Loading branch information
darionyaphet committed Feb 23, 2022
1 parent d3dbc84 commit 6c47886
Show file tree
Hide file tree
Showing 75 changed files with 174 additions and 251 deletions.
2 changes: 1 addition & 1 deletion src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ class MetaClient {
FRIEND_TEST(MetaClientTest, RetryUntilLimitTest);
FRIEND_TEST(MetaClientTest, RocksdbOptionsTest);
FRIEND_TEST(MetaClientTest, VerifyClientTest);
friend class KillQueryMetaWrapper;
FRIEND_TEST(ChainAddEdgesTest, AddEdgesLocalTest);
friend class KillQueryMetaWrapper;
friend class storage::MetaClientTestUpdater;

public:
Expand Down
2 changes: 0 additions & 2 deletions src/meta/ActiveHostsMan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ nebula::cpp2::ErrorCode ActiveHostsMan::updateHostInfo(kvstore::KVStore* kv,
bool hasUpdate = !data.empty();
data.emplace_back(MetaKeyUtils::hostKey(hostAddr.host, hostAddr.port), HostInfo::encodeV2(info));

folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
folly::Baton<true, std::atomic> baton;
nebula::cpp2::ErrorCode ret;
kv->asyncMultiPut(
Expand Down Expand Up @@ -266,7 +265,6 @@ nebula::cpp2::ErrorCode LastUpdateTimeMan::update(kvstore::KVStore* kv,
data.emplace_back(MetaKeyUtils::lastUpdateTimeKey(),
MetaKeyUtils::lastUpdateTimeVal(timeInMilliSec));

folly::SharedMutex::WriteHolder wHolder(LockUtils::lastUpdateTimeLock());
folly::Baton<true, std::atomic> baton;
nebula::cpp2::ErrorCode ret;
kv->asyncMultiPut(
Expand Down
4 changes: 2 additions & 2 deletions src/meta/http/MetaHttpReplaceHostHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void MetaHttpReplaceHostHandler::onError(ProxygenError error) noexcept {
}

bool MetaHttpReplaceHostHandler::replaceHostInPart(std::string ipv4From, std::string ipv4To) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
const auto& spacePrefix = MetaKeyUtils::spacePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, spacePrefix, &iter);
Expand Down Expand Up @@ -163,7 +163,7 @@ bool MetaHttpReplaceHostHandler::replaceHostInPart(std::string ipv4From, std::st
}

bool MetaHttpReplaceHostHandler::replaceHostInZone(std::string ipv4From, std::string ipv4To) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
const auto& zonePrefix = MetaKeyUtils::zonePrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto kvRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, zonePrefix, &iter);
Expand Down
6 changes: 0 additions & 6 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<std::string>> BaseProcessor<RESP>::

template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::autoIncrementId() {
folly::SharedMutex::WriteHolder holder(LockUtils::idLock());
const std::string kIdKey = MetaKeyUtils::idKey();
int32_t id;
std::string val;
Expand Down Expand Up @@ -186,9 +185,6 @@ ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::getAvailableGloba
template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::autoIncrementIdInSpace(
GraphSpaceID spaceId) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::localIdLock());
folly::SharedMutex::ReadHolder rHolder(LockUtils::idLock());

auto localIdkey = MetaKeyUtils::localIdKey(spaceId);
int32_t id;
std::string val;
Expand Down Expand Up @@ -228,7 +224,6 @@ ErrorOr<nebula::cpp2::ErrorCode, int32_t> BaseProcessor<RESP>::autoIncrementIdIn

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::spaceExist(GraphSpaceID spaceId) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
auto spaceKey = MetaKeyUtils::spaceKey(spaceId);
auto ret = doGet(std::move(spaceKey));
if (nebula::ok(ret)) {
Expand Down Expand Up @@ -617,7 +612,6 @@ ErrorOr<nebula::cpp2::ErrorCode, ZoneID> BaseProcessor<RESP>::getZoneId(
template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::listenerExist(GraphSpaceID space,
cpp2::ListenerType type) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::listenerLock());
const auto& prefix = MetaKeyUtils::listenerPrefix(space, type);
auto ret = doPrefix(prefix);
if (!nebula::ok(ret)) {
Expand Down
29 changes: 4 additions & 25 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,11 @@ namespace meta {
class LockUtils {
public:
LockUtils() = delete;
#define GENERATE_LOCK(Entry) \
static folly::SharedMutex& Entry##Lock() { \
static folly::SharedMutex l; \
return l; \
}

GENERATE_LOCK(lastUpdateTime);
GENERATE_LOCK(space);
GENERATE_LOCK(id);
GENERATE_LOCK(workerId);
GENERATE_LOCK(localId);
GENERATE_LOCK(tagAndEdge);
GENERATE_LOCK(tagIndex);
GENERATE_LOCK(edgeIndex);
GENERATE_LOCK(service);
GENERATE_LOCK(fulltextIndex);
GENERATE_LOCK(user);
GENERATE_LOCK(config);
GENERATE_LOCK(snapshot);
GENERATE_LOCK(group);
GENERATE_LOCK(zone);
GENERATE_LOCK(listener);
GENERATE_LOCK(session);
GENERATE_LOCK(machine);

#undef GENERATE_LOCK
static folly::SharedMutex& lock() {
static folly::SharedMutex lock;
return lock;
}
};

} // namespace meta
Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/admin/AgentHBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
HostAddr agentAddr((*req.host_ref()).host, (*req.host_ref()).port);
LOG(INFO) << "Receive heartbeat from " << agentAddr << ", role = AGENT";

folly::SharedMutex::WriteHolder holder(LockUtils::lock());
nebula::cpp2::ErrorCode ret = nebula::cpp2::ErrorCode::SUCCEEDED;
do {
// update agent host info
Expand Down
10 changes: 2 additions & 8 deletions src/meta/processors/admin/CreateBackupProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace meta {

ErrorOr<nebula::cpp2::ErrorCode, std::unordered_set<GraphSpaceID>>
CreateBackupProcessor::spaceNameToId(const std::vector<std::string>* backupSpaces) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
std::unordered_set<GraphSpaceID> spaces;

bool allSpaces = backupSpaces == nullptr || backupSpaces->empty();
Expand Down Expand Up @@ -98,8 +97,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) {
return;
}

folly::SharedMutex::WriteHolder wHolder(LockUtils::snapshotLock());

folly::SharedMutex::WriteHolder holder(LockUtils::lock());
// get active storage host list
auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_);
if (!nebula::ok(activeHostsRet)) {
Expand Down Expand Up @@ -239,11 +237,7 @@ void CreateBackupProcessor::process(const cpp2::CreateBackupReq& req) {
backup.backup_name_ref() = std::move(backupName);
backup.full_ref() = true;
bool allSpaces = backupSpaces == nullptr || backupSpaces->empty();
if (allSpaces) {
backup.all_spaces_ref() = true;
} else {
backup.all_spaces_ref() = false;
}
backup.all_spaces_ref() = allSpaces;
backup.create_time_ref() = time::WallClock::fastNowInMilliSec();

handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/CreateSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
}

auto snapshot = folly::sformat("SNAPSHOT_{}", MetaKeyUtils::genTimestampStr());
folly::SharedMutex::WriteHolder wHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());

auto activeHostsRet = ActiveHostsMan::getActiveHosts(kvstore_);
if (!nebula::ok(activeHostsRet)) {
Expand Down
10 changes: 3 additions & 7 deletions src/meta/processors/admin/DropSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace meta {

void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) {
auto& snapshot = req.get_name();
folly::SharedMutex::WriteHolder wHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());

// Check snapshot is exists
auto key = MetaKeyUtils::snapshotKey(snapshot);
Expand Down Expand Up @@ -53,9 +53,7 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) {
MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::INVALID, hosts));
auto putRet = doSyncPut(std::move(data));
if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Update snapshot status error. "
"snapshot : "
<< snapshot;
LOG(INFO) << "Update snapshot status error. snapshot: " << snapshot;
}
handleErrorCode(putRet);
onFinished();
Expand All @@ -72,9 +70,7 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) {
MetaKeyUtils::snapshotVal(cpp2::SnapshotStatus::INVALID, hosts));
auto putRet = doSyncPut(std::move(data));
if (putRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Update snapshot status error. "
"snapshot : "
<< snapshot;
LOG(INFO) << "Update snapshot status error. snapshot: " << snapshot;
}
handleErrorCode(putRet);
onFinished();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/HBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ void HBProcessor::process(const cpp2::HBReq& req) {
auto role = req.get_role();
LOG(INFO) << "Receive heartbeat from " << host
<< ", role = " << apache::thrift::util::enumNameSafe(role);

folly::SharedMutex::WriteHolder holder(LockUtils::lock());
if (role == cpp2::HostRole::STORAGE) {
if (!ActiveHostsMan::machineRegisted(kvstore_, host)) {
LOG(INFO) << "Machine " << host << " is not registed";
Expand Down
3 changes: 2 additions & 1 deletion src/meta/processors/admin/ListClusterInfoProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ void ListClusterInfoProcessor::process(const cpp2::ListClusterInfoReq& req) {
std::unordered_map<std::string, std::vector<cpp2::ServiceInfo>> hostServices;

// non-meta services, may include inactive services
folly::SharedMutex::ReadHolder holder(LockUtils::lock());
const auto& hostPrefix = MetaKeyUtils::hostPrefix();
auto iterRet = doPrefix(hostPrefix);
if (!nebula::ok(iterRet)) {
LOG(INFO) << "get host prefix failed:"
LOG(INFO) << "get host prefix failed: "
<< apache::thrift::util::enumNameSafe(nebula::error(iterRet));
handleErrorCode(nebula::cpp2::ErrorCode::E_LIST_CLUSTER_FAILURE);
onFinished();
Expand Down
3 changes: 2 additions & 1 deletion src/meta/processors/admin/ListSnapshotsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace nebula {
namespace meta {

void ListSnapshotsProcessor::process(const cpp2::ListSnapshotsReq&) {
folly::SharedMutex::ReadHolder holder(LockUtils::lock());
const auto& prefix = MetaKeyUtils::snapshotPrefix();
auto iterRet = doPrefix(prefix);
if (!nebula::ok(iterRet)) {
Expand All @@ -20,8 +21,8 @@ void ListSnapshotsProcessor::process(const cpp2::ListSnapshotsReq&) {
onFinished();
return;
}
auto iter = nebula::value(iterRet).get();

auto iter = nebula::value(iterRet).get();
std::vector<nebula::meta::cpp2::Snapshot> snapshots;
while (iter->valid()) {
auto val = iter->val();
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/admin/RestoreProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace meta {
nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr& ipv4From,
const HostAddr& ipv4To,
bool direct) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED;
const auto& spacePrefix = MetaKeyUtils::spacePrefix();
auto iterRet = doPrefix(spacePrefix, direct);
Expand Down Expand Up @@ -83,7 +83,7 @@ nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInPartition(const HostAddr&
nebula::cpp2::ErrorCode RestoreProcessor::replaceHostInZone(const HostAddr& ipv4From,
const HostAddr& ipv4To,
bool direct) {
folly::SharedMutex::WriteHolder wHolder(LockUtils::spaceLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto retCode = nebula::cpp2::ErrorCode::SUCCEEDED;
const auto& zonePrefix = MetaKeyUtils::zonePrefix();
auto iterRet = doPrefix(zonePrefix, direct);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/admin/SnapShot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ nebula::cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType s
for (const auto& [host, spaces] : hostSpaces) {
LOG(INFO) << "will block write host: " << host;
auto result = client_->blockingWrites(spaces, sign, host).get();
LOG(INFO) << "after block write host";
if (!result.ok()) {
LOG(INFO) << "Send blocking sign error on host " << host
<< ", errorcode: " << result.status().toString();
Expand All @@ -120,7 +121,6 @@ nebula::cpp2::ErrorCode Snapshot::blockingWrites(storage::cpp2::EngineSignType s

ErrorOr<nebula::cpp2::ErrorCode, std::map<HostAddr, std::set<GraphSpaceID>>>
Snapshot::getHostSpaces() {
folly::SharedMutex::ReadHolder rHolder(LockUtils::spaceLock());
const auto& prefix = MetaKeyUtils::partPrefix();
std::unique_ptr<kvstore::KVIterator> iter;
auto retCode = kv_->prefix(kDefaultSpaceId, kDefaultPartId, prefix, &iter);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/config/GetConfigProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ void GetConfigProcessor::process(const cpp2::GetConfigReq& req) {
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;

do {
folly::SharedMutex::ReadHolder rHolder(LockUtils::configLock());
folly::SharedMutex::ReadHolder holder(LockUtils::lock());
if (module != cpp2::ConfigModule::ALL) {
code = getOneConfig(module, name, items);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/config/ListConfigsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace nebula {
namespace meta {

void ListConfigsProcessor::process(const cpp2::ListConfigsReq& req) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::configLock());
folly::SharedMutex::ReadHolder holder(LockUtils::lock());

const auto& prefix = MetaKeyUtils::configKeyPrefix(req.get_module());
auto iterRet = doPrefix(prefix);
Expand All @@ -20,8 +20,8 @@ void ListConfigsProcessor::process(const cpp2::ListConfigsReq& req) {
onFinished();
return;
}
auto iter = nebula::value(iterRet).get();

auto iter = nebula::value(iterRet).get();
std::vector<cpp2::ConfigItem> items;
while (iter->valid()) {
auto key = iter->key();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/config/RegConfigProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace meta {
void RegConfigProcessor::process(const cpp2::RegConfigReq& req) {
std::vector<kvstore::KV> data;
{
folly::SharedMutex::WriteHolder wHolder(LockUtils::configLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
for (const auto& item : req.get_items()) {
auto module = item.get_module();
auto name = item.get_name();
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/config/SetConfigProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ void SetConfigProcessor::process(const cpp2::SetConfigReq& req) {
auto name = req.get_item().get_name();
auto value = req.get_item().get_value();

folly::SharedMutex::WriteHolder wHolder(LockUtils::configLock());
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
do {
if (module != cpp2::ConfigModule::ALL) {
// When we set config of a specified module, check if it exists.
Expand Down
20 changes: 4 additions & 16 deletions src/meta/processors/id/GetWorkerIdProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void GetWorkerIdProcessor::process(const cpp2::GetWorkerIdReq& req) {
return;
}

folly::SharedMutex::WriteHolder wHolder(LockUtils::workerIdLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto newResult = doGet(kIdKey);
if (!nebula::ok(newResult)) {
handleErrorCode(nebula::cpp2::ErrorCode::E_WORKER_ID_FAILED);
Expand All @@ -29,25 +29,13 @@ void GetWorkerIdProcessor::process(const cpp2::GetWorkerIdReq& req) {
}

int64_t workerId = std::stoi(std::move(nebula::value(newResult)));
resp_.workerid_ref() = workerId;
handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED);

// TODO: (jackwener) limit worker, add LOG ERROR
doPut(std::vector<kvstore::KV>{{ipAddr, std::to_string(workerId + 1)}});

handleErrorCode(nebula::cpp2::ErrorCode::SUCCEEDED);
resp_.workerid_ref() = workerId;
onFinished();
}

void GetWorkerIdProcessor::doPut(std::vector<kvstore::KV> data) {
folly::Baton<true, std::atomic> baton;
kvstore_->asyncMultiPut(kDefaultSpaceId,
kDefaultPartId,
std::move(data),
[this, &baton](nebula::cpp2::ErrorCode code) {
this->handleErrorCode(code);
baton.post();
});
baton.wait();
}

} // namespace meta
} // namespace nebula
2 changes: 0 additions & 2 deletions src/meta/processors/id/GetWorkerIdProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class GetWorkerIdProcessor : public BaseProcessor<cpp2::GetWorkerIdResp> {
UNUSED(once);
}

void doPut(std::vector<kvstore::KV> data);

inline static const string kIdKey = "snowflake_worker_id";
};

Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/index/CreateEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ void CreateEdgeIndexProcessor::process(const cpp2::CreateEdgeIndexReq& req) {
return;
}

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeIndexLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = getIndexID(space, indexName);
if (nebula::ok(ret)) {
if (req.get_if_not_exists()) {
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/index/CreateTagIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ void CreateTagIndexProcessor::process(const cpp2::CreateTagIndexReq& req) {
return;
}

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder wHolder(LockUtils::tagIndexLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
auto ret = getIndexID(space, indexName);
if (nebula::ok(ret)) {
if (req.get_if_not_exists()) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/index/DropEdgeIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ void DropEdgeIndexProcessor::process(const cpp2::DropEdgeIndexReq& req) {
auto spaceID = req.get_space_id();
const auto& indexName = req.get_index_name();
CHECK_SPACE_ID_AND_RETURN(spaceID);
folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeIndexLock());
folly::SharedMutex::WriteHolder holder(LockUtils::lock());

auto edgeIndexIDRet = getIndexID(spaceID, indexName);
if (!nebula::ok(edgeIndexIDRet)) {
Expand Down
Loading

0 comments on commit 6c47886

Please sign in to comment.