Skip to content

Commit

Permalink
Fix race of create tag and edge (#3735)
Browse files Browse the repository at this point in the history
* fix rece

* polish

* fix review
  • Loading branch information
jackwener authored and Sophie-Xie committed Jan 26, 2022
1 parent 87e8680 commit 31d11d6
Show file tree
Hide file tree
Showing 15 changed files with 56 additions and 64 deletions.
2 changes: 1 addition & 1 deletion src/codec/RowReaderV2.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class RowReaderV2 : public RowReader {
FRIEND_TEST(ScanEdgePropBench, ProcessEdgeProps);

public:
virtual ~RowReaderV2() = default;
~RowReaderV2() override = default;

Value getValueByName(const std::string& prop) const noexcept override;
Value getValueByIndex(const int64_t index) const noexcept override;
Expand Down
2 changes: 1 addition & 1 deletion src/graph/planner/match/MatchPlanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace graph {
class MatchPlanner final : public Planner {
public:
static std::unique_ptr<MatchPlanner> make() {
return std::unique_ptr<MatchPlanner>(new MatchPlanner());
return std::make_unique<MatchPlanner>();
}

static bool match(AstContext* astCtx);
Expand Down
3 changes: 1 addition & 2 deletions src/meta/processors/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ class LockUtils {
GENERATE_LOCK(id);
GENERATE_LOCK(workerId);
GENERATE_LOCK(localId);
GENERATE_LOCK(tag);
GENERATE_LOCK(edge);
GENERATE_LOCK(tagAndEdge);
GENERATE_LOCK(tagIndex);
GENERATE_LOCK(edgeIndex);
GENERATE_LOCK(service);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/index/FTIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ void CreateFTIndexProcessor::process(const cpp2::CreateFTIndexReq& req) {
const auto& index = req.get_index();
const std::string& name = req.get_fulltext_index_name();
CHECK_SPACE_ID_AND_RETURN(index.get_space_id());
folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock());
auto isEdge = index.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type;
folly::SharedMutex::ReadHolder rHolder(isEdge ? LockUtils::edgeLock() : LockUtils::tagLock());
auto schemaPrefix = isEdge ? MetaKeyUtils::schemaEdgePrefix(
index.get_space_id(), index.get_depend_schema().get_edge_type())
: MetaKeyUtils::schemaTagPrefix(
Expand Down
6 changes: 3 additions & 3 deletions src/meta/processors/parts/CreateSpaceAsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcesso

ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcessor::makeNewTags(
GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock());
folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock());
auto prefix = MetaKeyUtils::schemaTagsPrefix(oldSpaceId);
auto tagPrefix = doPrefix(prefix);
if (!nebula::ok(tagPrefix)) {
Expand Down Expand Up @@ -164,7 +164,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcesso

ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcessor::makeNewEdges(
GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock());
folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock());
auto prefix = MetaKeyUtils::schemaEdgesPrefix(oldSpaceId);
auto edgePrefix = doPrefix(prefix);
if (!nebula::ok(edgePrefix)) {
Expand Down Expand Up @@ -196,7 +196,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcesso

ErrorOr<nebula::cpp2::ErrorCode, std::vector<kvstore::KV>> CreateSpaceAsProcessor::makeNewIndexes(
GraphSpaceID oldSpaceId, GraphSpaceID newSpaceId) {
folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock());
folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock());
auto prefix = MetaKeyUtils::indexPrefix(oldSpaceId);
auto indexPrefix = doPrefix(prefix);
if (!nebula::ok(indexPrefix)) {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/schema/AlterEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ namespace meta {
void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);
auto edgeName = req.get_edge_name();
const auto& edgeName = req.get_edge_name();

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock());
folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock());
auto ret = getEdgeType(spaceId, edgeName);
if (!nebula::ok(ret)) {
auto retCode = nebula::error(ret);
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/schema/AlterTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ namespace meta {
void AlterTagProcessor::process(const cpp2::AlterTagReq& req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);
auto tagName = req.get_tag_name();
const auto& tagName = req.get_tag_name();

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder wHolder(LockUtils::tagLock());
folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock());
auto ret = getTagId(spaceId, tagName);
if (!nebula::ok(ret)) {
auto retCode = nebula::error(ret);
Expand Down
38 changes: 17 additions & 21 deletions src/meta/processors/schema/CreateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,25 @@ namespace meta {
void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);
auto edgeName = req.get_edge_name();
{
// if there is an tag of the same name
// TODO: there exists race condition, we should address it in the future
folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock());
auto conflictRet = getTagId(spaceId, edgeName);
if (nebula::ok(conflictRet)) {
LOG(ERROR) << "Failed to create edge `" << edgeName
<< "': some tag with the same name already exists.";
resp_.id_ref() = to(nebula::value(conflictRet), EntryType::EDGE);
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
const auto& edgeName = req.get_edge_name();
folly::SharedMutex::WriteHolder holder(LockUtils::tagAndEdgeLock());
// Check if the tag with same name exists
auto conflictRet = getTagId(spaceId, edgeName);
if (nebula::ok(conflictRet)) {
LOG(ERROR) << "Failed to create edge `" << edgeName
<< "': some tag with the same name already exists.";
resp_.id_ref() = to(nebula::value(conflictRet), EntryType::EDGE);
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
onFinished();
return;
} else {
auto retCode = nebula::error(conflictRet);
if (retCode != nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND) {
LOG(ERROR) << "Failed to create edge " << edgeName << " error "
<< apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
onFinished();
return;
} else {
auto retCode = nebula::error(conflictRet);
if (retCode != nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND) {
LOG(ERROR) << "Failed to create edge " << edgeName << " error "
<< apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
onFinished();
return;
}
}
}

Expand All @@ -49,7 +46,6 @@ void CreateEdgeProcessor::process(const cpp2::CreateEdgeReq& req) {
schema.columns_ref() = std::move(columns);
schema.schema_prop_ref() = req.get_schema().get_schema_prop();

folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock());
auto ret = getEdgeType(spaceId, edgeName);
if (nebula::ok(ret)) {
if (req.get_if_not_exists()) {
Expand Down
39 changes: 18 additions & 21 deletions src/meta/processors/schema/CreateTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,26 @@ namespace meta {
void CreateTagProcessor::process(const cpp2::CreateTagReq& req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);
auto tagName = req.get_tag_name();
{
// if there is an edge of the same name
// TODO: there exists race condition, we should address it in the future
folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock());
auto conflictRet = getEdgeType(spaceId, tagName);
if (nebula::ok(conflictRet)) {
LOG(ERROR) << "Failed to create tag `" << tagName
<< "': some edge with the same name already exists.";
resp_.id_ref() = to(nebula::value(conflictRet), EntryType::TAG);
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
const auto& tagName = req.get_tag_name();
folly::SharedMutex::WriteHolder holder(LockUtils::tagAndEdgeLock());

// Check if the edge with same name exists
auto conflictRet = getEdgeType(spaceId, tagName);
if (nebula::ok(conflictRet)) {
LOG(ERROR) << "Failed to create tag `" << tagName
<< "': some edge with the same name already exists.";
resp_.id_ref() = to(nebula::value(conflictRet), EntryType::TAG);
handleErrorCode(nebula::cpp2::ErrorCode::E_CONFLICT);
onFinished();
return;
} else {
auto retCode = nebula::error(conflictRet);
if (retCode != nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND) {
LOG(ERROR) << "Failed to create tag " << tagName << " error "
<< apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
onFinished();
return;
} else {
auto retCode = nebula::error(conflictRet);
if (retCode != nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND) {
LOG(ERROR) << "Failed to create tag " << tagName << " error "
<< apache::thrift::util::enumNameSafe(retCode);
handleErrorCode(retCode);
onFinished();
return;
}
}
}

Expand All @@ -49,7 +47,6 @@ void CreateTagProcessor::process(const cpp2::CreateTagReq& req) {
schema.columns_ref() = std::move(columns);
schema.schema_prop_ref() = req.get_schema().get_schema_prop();

folly::SharedMutex::WriteHolder wHolder(LockUtils::tagLock());
auto ret = getTagId(spaceId, tagName);
if (nebula::ok(ret)) {
if (req.get_if_not_exists()) {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/schema/DropEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) {
CHECK_SPACE_ID_AND_RETURN(spaceId);

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder wHolder(LockUtils::edgeLock());
auto edgeName = req.get_edge_name();
folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock());
const auto& edgeName = req.get_edge_name();

EdgeType edgeType;
auto indexKey = MetaKeyUtils::indexEdgeKey(spaceId, edgeName);
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/schema/DropTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ void DropTagProcessor::process(const cpp2::DropTagReq& req) {
CHECK_SPACE_ID_AND_RETURN(spaceId);

folly::SharedMutex::ReadHolder rHolder(LockUtils::snapshotLock());
folly::SharedMutex::WriteHolder wHolder(LockUtils::tagLock());
auto tagName = req.get_tag_name();
folly::SharedMutex::WriteHolder wHolder(LockUtils::tagAndEdgeLock());
const auto& tagName = req.get_tag_name();

TagID tagId;
auto indexKey = MetaKeyUtils::indexTagKey(spaceId, tagName);
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/schema/GetEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ namespace meta {
void GetEdgeProcessor::process(const cpp2::GetEdgeReq& req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);
auto edgeName = req.get_edge_name();
const auto& edgeName = req.get_edge_name();
auto ver = req.get_version();

folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock());
folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock());
auto edgeTypeRet = getEdgeType(spaceId, edgeName);
if (!nebula::ok(edgeTypeRet)) {
LOG(ERROR) << "Get edge " << edgeName << " failed.";
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/schema/GetTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ namespace meta {
void GetTagProcessor::process(const cpp2::GetTagReq& req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);
auto tagName = req.get_tag_name();
const auto& tagName = req.get_tag_name();
auto ver = req.get_version();

folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock());
folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock());
auto tagIdRet = getTagId(spaceId, tagName);
if (!nebula::ok(tagIdRet)) {
LOG(ERROR) << "Get tag " << tagName << " failed.";
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/ListEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ void ListEdgesProcessor::process(const cpp2::ListEdgesReq &req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);

folly::SharedMutex::ReadHolder rHolder(LockUtils::edgeLock());
folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock());
auto prefix = MetaKeyUtils::schemaEdgesPrefix(spaceId);
auto ret = doPrefix(prefix);
if (!nebula::ok(ret)) {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/ListTagsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ void ListTagsProcessor::process(const cpp2::ListTagsReq &req) {
GraphSpaceID spaceId = req.get_space_id();
CHECK_SPACE_ID_AND_RETURN(spaceId);

folly::SharedMutex::ReadHolder rHolder(LockUtils::tagLock());
folly::SharedMutex::ReadHolder rHolder(LockUtils::tagAndEdgeLock());
auto prefix = MetaKeyUtils::schemaTagsPrefix(spaceId);
auto ret = doPrefix(prefix);
if (!nebula::ok(ret)) {
Expand Down

0 comments on commit 31d11d6

Please sign in to comment.