From db7a5a3c3889f6062f673da72b452433cd430e63 Mon Sep 17 00:00:00 2001 From: jakevin <30525741+jackwener@users.noreply.github.com> Date: Tue, 18 Jan 2022 12:43:16 +0800 Subject: [PATCH] Fix race of create tag and edge (#3735) * fix rece * polish * fix review --- src/codec/RowReaderV2.h | 2 +- src/graph/planner/match/MatchPlanner.h | 2 +- src/meta/processors/Common.h | 3 +- .../processors/index/FTIndexProcessor.cpp | 2 +- .../parts/CreateSpaceAsProcessor.cpp | 6 +-- .../processors/schema/AlterEdgeProcessor.cpp | 4 +- .../processors/schema/AlterTagProcessor.cpp | 4 +- .../processors/schema/CreateEdgeProcessor.cpp | 38 ++++++++---------- .../processors/schema/CreateTagProcessor.cpp | 39 +++++++++---------- .../processors/schema/DropEdgeProcessor.cpp | 4 +- .../processors/schema/DropTagProcessor.cpp | 4 +- .../processors/schema/GetEdgeProcessor.cpp | 4 +- .../processors/schema/GetTagProcessor.cpp | 4 +- .../processors/schema/ListEdgesProcessor.cpp | 2 +- .../processors/schema/ListTagsProcessor.cpp | 2 +- 15 files changed, 56 insertions(+), 64 deletions(-) diff --git a/src/codec/RowReaderV2.h b/src/codec/RowReaderV2.h index 1785ad8dab9..9bad0bf7f05 100644 --- a/src/codec/RowReaderV2.h +++ b/src/codec/RowReaderV2.h @@ -26,7 +26,7 @@ class RowReaderV2 : public RowReader { FRIEND_TEST(ScanEdgePropBench, ProcessEdgeProps); public: - virtual ~RowReaderV2() = default; + ~RowReaderV2() override = default; Value getValueByName(const std::string& prop) const noexcept override; Value getValueByIndex(const int64_t index) const noexcept override; diff --git a/src/graph/planner/match/MatchPlanner.h b/src/graph/planner/match/MatchPlanner.h index dfe55c43362..0017ee66db3 100644 --- a/src/graph/planner/match/MatchPlanner.h +++ b/src/graph/planner/match/MatchPlanner.h @@ -14,7 +14,7 @@ namespace graph { class MatchPlanner final : public Planner { public: static std::unique_ptr make() { - return std::unique_ptr(new MatchPlanner()); + return std::make_unique(); } static bool match(AstContext* astCtx); diff --git a/src/meta/processors/Common.h b/src/meta/processors/Common.h index a4a436dcda1..1c7ab8c1a1d 100644 --- a/src/meta/processors/Common.h +++ b/src/meta/processors/Common.h @@ -25,8 +25,7 @@ class LockUtils { GENERATE_LOCK(id); GENERATE_LOCK(workerId); GENERATE_LOCK(localId); - GENERATE_LOCK(tag); - GENERATE_LOCK(edge); + GENERATE_LOCK(tagAndEdge); GENERATE_LOCK(tagIndex); GENERATE_LOCK(edgeIndex); GENERATE_LOCK(service); diff --git a/src/meta/processors/index/FTIndexProcessor.cpp b/src/meta/processors/index/FTIndexProcessor.cpp index 2df5241d791..6928acd7844 100644 --- a/src/meta/processors/index/FTIndexProcessor.cpp +++ b/src/meta/processors/index/FTIndexProcessor.cpp @@ -15,8 +15,8 @@ void CreateFTIndexProcessor::process(const cpp2::CreateFTIndexReq& req) { const auto& index = req.get_index(); const std::string& name = req.get_fulltext_index_name(); CHECK_SPACE_ID_AND_RETURN(index.get_space_id()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto isEdge = index.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type; - folly::SharedMutex::ReadHolder rHolder(isEdge ? LockUtils::edgeLock() : LockUtils::tagLock()); auto schemaPrefix = isEdge ? MetaKeyUtils::schemaEdgePrefix( index.get_space_id(), index.get_depend_schema().get_edge_type()) : MetaKeyUtils::schemaTagPrefix( diff --git a/src/meta/processors/parts/CreateSpaceAsProcessor.cpp b/src/meta/processors/parts/CreateSpaceAsProcessor.cpp index b61ab6fd7c7..5a7f1dd03f4 100644 --- a/src/meta/processors/parts/CreateSpaceAsProcessor.cpp +++ b/src/meta/processors/parts/CreateSpaceAsProcessor.cpp @@ -132,7 +132,7 @@ ErrorOr> CreateSpaceAsProcesso ErrorOr> CreateSpaceAsProcessor::makeNewTags( GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::schemaTagsPrefix(oldSpaceId); auto tagPrefix = doPrefix(prefix); if (!nebula::ok(tagPrefix)) { @@ -164,7 +164,7 @@ ErrorOr> CreateSpaceAsProcesso ErrorOr> CreateSpaceAsProcessor::makeNewEdges( GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::schemaEdgesPrefix(oldSpaceId); auto edgePrefix = doPrefix(prefix); if (!nebula::ok(edgePrefix)) { @@ -196,7 +196,7 @@ ErrorOr> CreateSpaceAsProcesso ErrorOr> CreateSpaceAsProcessor::makeNewIndexes( GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) { - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::indexPrefix(oldSpaceId); auto indexPrefix = doPrefix(prefix); if (!nebula::ok(indexPrefix)) { diff --git a/src/meta/processors/schema/AlterEdgeProcessor.cpp b/src/meta/processors/schema/AlterEdgeProcessor.cpp index e2a552f1fdf..c2b6031cf53 100644 --- a/src/meta/processors/schema/AlterEdgeProcessor.cpp +++ b/src/meta/processors/schema/AlterEdgeProcessor.cpp @@ -13,10 +13,10 @@ namespace meta { void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto edgeName = req.get_edge_name(); + const auto& edgeName = req.get_edge_name(); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock()); + folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); auto ret = getEdgeType(spaceId, edgeName); if (!nebula::ok(ret)) { auto retCode = nebula::error(ret); diff --git a/src/meta/processors/schema/AlterTagProcessor.cpp b/src/meta/processors/schema/AlterTagProcessor.cpp index 31f0ea41173..2284710ec92 100644 --- a/src/meta/processors/schema/AlterTagProcessor.cpp +++ b/src/meta/processors/schema/AlterTagProcessor.cpp @@ -13,10 +13,10 @@ namespace meta { void AlterTagProcessor::process(const cpp2::AlterTagReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto tagName = req.get_tag_name(); + const auto& tagName = req.get_tag_name(); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagLock()); + folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); auto ret = getTagId(spaceId, tagName); if (!nebula::ok(ret)) { auto retCode = nebula::error(ret); diff --git a/src/meta/processors/schema/CreateEdgeProcessor.cpp b/src/meta/processors/schema/CreateEdgeProcessor.cpp index 671d5ddab16..3b9a65fe7a5 100644 --- a/src/meta/processors/schema/CreateEdgeProcessor.cpp +++ b/src/meta/processors/schema/CreateEdgeProcessor.cpp @@ -13,28 +13,25 @@ namespace meta { void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto edgeName = req.get_edge_name(); - { - // if there is an tag of the same name - // TODO: there exists race condition, we should address it in the future - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock()); - auto conflictRet = getTagId(spaceId, edgeName); - if (nebula::ok(conflictRet)) { - LOG(ERROR) << "Failed to create edge `" << edgeName - << "': some tag with the same name already exists."; - resp_.id_ref() = to(nebula::value(conflictRet), EntryType::EDGE); - handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + const auto& edgeName = req.get_edge_name(); + folly::SharedMutex::WriteHolder holder(LockUtils::tagAndEdgeLock()); + // Check if the tag with same name exists + auto conflictRet = getTagId(spaceId, edgeName); + if (nebula::ok(conflictRet)) { + LOG(ERROR) << "Failed to create edge `" << edgeName + << "': some tag with the same name already exists."; + resp_.id_ref() = to(nebula::value(conflictRet), EntryType::EDGE); + handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + onFinished(); + return; + } else { + auto retCode = nebula::error(conflictRet); + if (retCode != nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND) { + LOG(ERROR) << "Failed to create edge " << edgeName << " error " + << apache::thrift::util::enumNameSafe(retCode); + handleErrorCode(retCode); onFinished(); return; - } else { - auto retCode = nebula::error(conflictRet); - if (retCode != nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND) { - LOG(ERROR) << "Failed to create edge " << edgeName << " error " - << apache::thrift::util::enumNameSafe(retCode); - handleErrorCode(retCode); - onFinished(); - return; - } } } @@ -49,7 +46,6 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) { schema.columns_ref() = std::move(columns); schema.schema_prop_ref() = req.get_schema().get_schema_prop(); - folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock()); auto ret = getEdgeType(spaceId, edgeName); if (nebula::ok(ret)) { if (req.get_if_not_exists()) { diff --git a/src/meta/processors/schema/CreateTagProcessor.cpp b/src/meta/processors/schema/CreateTagProcessor.cpp index b5f3d39124b..dd1cd80f37b 100644 --- a/src/meta/processors/schema/CreateTagProcessor.cpp +++ b/src/meta/processors/schema/CreateTagProcessor.cpp @@ -13,28 +13,26 @@ namespace meta { void CreateTagProcessor::process(const cpp2::CreateTagReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto tagName = req.get_tag_name(); - { - // if there is an edge of the same name - // TODO: there exists race condition, we should address it in the future - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock()); - auto conflictRet = getEdgeType(spaceId, tagName); - if (nebula::ok(conflictRet)) { - LOG(ERROR) << "Failed to create tag `" << tagName - << "': some edge with the same name already exists."; - resp_.id_ref() = to(nebula::value(conflictRet), EntryType::TAG); - handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + const auto& tagName = req.get_tag_name(); + folly::SharedMutex::WriteHolder holder(LockUtils::tagAndEdgeLock()); + + // Check if the edge with same name exists + auto conflictRet = getEdgeType(spaceId, tagName); + if (nebula::ok(conflictRet)) { + LOG(ERROR) << "Failed to create tag `" << tagName + << "': some edge with the same name already exists."; + resp_.id_ref() = to(nebula::value(conflictRet), EntryType::TAG); + handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT); + onFinished(); + return; + } else { + auto retCode = nebula::error(conflictRet); + if (retCode != nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND) { + LOG(ERROR) << "Failed to create tag " << tagName << " error " + << apache::thrift::util::enumNameSafe(retCode); + handleErrorCode(retCode); onFinished(); return; - } else { - auto retCode = nebula::error(conflictRet); - if (retCode != nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND) { - LOG(ERROR) << "Failed to create tag " << tagName << " error " - << apache::thrift::util::enumNameSafe(retCode); - handleErrorCode(retCode); - onFinished(); - return; - } } } @@ -49,7 +47,6 @@ void CreateTagProcessor::process(const cpp2::CreateTagReq& req) { schema.columns_ref() = std::move(columns); schema.schema_prop_ref() = req.get_schema().get_schema_prop(); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagLock()); auto ret = getTagId(spaceId, tagName); if (nebula::ok(ret)) { if (req.get_if_not_exists()) { diff --git a/src/meta/processors/schema/DropEdgeProcessor.cpp b/src/meta/processors/schema/DropEdgeProcessor.cpp index 1315681bbb3..ca42af470ab 100644 --- a/src/meta/processors/schema/DropEdgeProcessor.cpp +++ b/src/meta/processors/schema/DropEdgeProcessor.cpp @@ -13,8 +13,8 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) { CHECK_SPACE_ID_AND_RETURN(spaceId); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock()); - auto edgeName = req.get_edge_name(); + folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); + const auto& edgeName = req.get_edge_name(); EdgeType edgeType; auto indexKey = MetaKeyUtils::indexEdgeKey(spaceId, edgeName); diff --git a/src/meta/processors/schema/DropTagProcessor.cpp b/src/meta/processors/schema/DropTagProcessor.cpp index 08cc3ff5331..8fcf9cf74d3 100644 --- a/src/meta/processors/schema/DropTagProcessor.cpp +++ b/src/meta/processors/schema/DropTagProcessor.cpp @@ -13,8 +13,8 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) { CHECK_SPACE_ID_AND_RETURN(spaceId); folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock()); - folly::SharedMutex::WriteHolder wHolder(LockUtils::tagLock()); - auto tagName = req.get_tag_name(); + folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock()); + const auto& tagName = req.get_tag_name(); TagID tagId; auto indexKey = MetaKeyUtils::indexTagKey(spaceId, tagName); diff --git a/src/meta/processors/schema/GetEdgeProcessor.cpp b/src/meta/processors/schema/GetEdgeProcessor.cpp index 0ae0e3a5443..b8162cdcd58 100644 --- a/src/meta/processors/schema/GetEdgeProcessor.cpp +++ b/src/meta/processors/schema/GetEdgeProcessor.cpp @@ -11,10 +11,10 @@ namespace meta { void GetEdgeProcessor::process(const cpp2::GetEdgeReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto edgeName = req.get_edge_name(); + const auto& edgeName = req.get_edge_name(); auto ver = req.get_version(); - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto edgeTypeRet = getEdgeType(spaceId, edgeName); if (!nebula::ok(edgeTypeRet)) { LOG(ERROR) << "Get edge " << edgeName << " failed."; diff --git a/src/meta/processors/schema/GetTagProcessor.cpp b/src/meta/processors/schema/GetTagProcessor.cpp index 82e726e34ad..e085d2b9d9f 100644 --- a/src/meta/processors/schema/GetTagProcessor.cpp +++ b/src/meta/processors/schema/GetTagProcessor.cpp @@ -11,10 +11,10 @@ namespace meta { void GetTagProcessor::process(const cpp2::GetTagReq& req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - auto tagName = req.get_tag_name(); + const auto& tagName = req.get_tag_name(); auto ver = req.get_version(); - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto tagIdRet = getTagId(spaceId, tagName); if (!nebula::ok(tagIdRet)) { LOG(ERROR) << "Get tag " << tagName << " failed."; diff --git a/src/meta/processors/schema/ListEdgesProcessor.cpp b/src/meta/processors/schema/ListEdgesProcessor.cpp index 3e5e2691762..ebe41692dae 100644 --- a/src/meta/processors/schema/ListEdgesProcessor.cpp +++ b/src/meta/processors/schema/ListEdgesProcessor.cpp @@ -12,7 +12,7 @@ void ListEdgesProcessor::process(const cpp2::ListEdgesReq &req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::schemaEdgesPrefix(spaceId); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) { diff --git a/src/meta/processors/schema/ListTagsProcessor.cpp b/src/meta/processors/schema/ListTagsProcessor.cpp index 26da7cd41d6..ee4d3eff9d7 100644 --- a/src/meta/processors/schema/ListTagsProcessor.cpp +++ b/src/meta/processors/schema/ListTagsProcessor.cpp @@ -12,7 +12,7 @@ void ListTagsProcessor::process(const cpp2::ListTagsReq &req) { GraphSpaceID spaceId = req.get_space_id(); CHECK_SPACE_ID_AND_RETURN(spaceId); - folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock()); + folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock()); auto prefix = MetaKeyUtils::schemaTagsPrefix(spaceId); auto ret = doPrefix(prefix); if (!nebula::ok(ret)) {