From a62e8963a1d0c4ba4af6f8650095e78addb7ed5a Mon Sep 17 00:00:00 2001 From: bright-starry-sky <56461666+bright-starry-sky@users.noreply.github.com> Date: Wed, 7 Jul 2021 00:05:28 +0800 Subject: [PATCH] done --- src/storage/mutate/AddEdgesProcessor.cpp | 39 ++++++++-------- src/storage/mutate/AddVerticesProcessor.cpp | 27 ++++++----- src/storage/mutate/DeleteEdgesProcessor.cpp | 46 +++++++++++-------- .../mutate/DeleteVerticesProcessor.cpp | 24 +++++----- src/storage/test/MemoryLockTest.cpp | 15 ++++++ src/utils/MemoryLockWrapper.h | 19 +++++--- 6 files changed, 102 insertions(+), 68 deletions(-) diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index 29c45659c..be573915a 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -153,6 +153,25 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { visited.reserve(newEdges.size()); for (auto& newEdge : newEdges) { auto edgeKey = *newEdge.key_ref(); + auto l = std::make_tuple(spaceId_, + partId, + (*edgeKey.src_ref()).getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + (*edgeKey.dst_ref()).getStr()); + if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { + if (!env_->edgesML_->try_lock(l)) { + env_->edgesML_->unlockBatch(dummyLock); + LOG(ERROR) << folly::format("edge locked : src {}, type {}, rank {}, dst {}", + (*edgeKey.src_ref()).getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + (*edgeKey.dst_ref()).getStr()); + code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + break; + } + } + dummyLock.emplace_back(std::move(l)); VLOG(3) << "PartitionID: " << partId << ", VertexID: " << *edgeKey.src_ref() << ", EdgeType: " << *edgeKey.edge_type_ref() << ", EdgeRanking: " << *edgeKey.ranking_ref() << ", VertexID: " @@ -274,12 +293,6 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { break; } batchHolder->put(std::move(key), std::move(retEnc.value())); - dummyLock.emplace_back(std::make_tuple(spaceId_, - partId, - (*edgeKey.src_ref()).getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - (*edgeKey.dst_ref()).getStr())); } if (code != nebula::cpp2::ErrorCode::SUCCEEDED) { handleAsync(spaceId_, partId, code); @@ -287,19 +300,7 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) { } auto batch = encodeBatchValue(std::move(batchHolder)->getBatch()); DCHECK(!batch.empty()); - nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), true); - if (!lg) { - auto conflict = lg.conflictKey(); - LOG(ERROR) << "edge conflict " - << std::get<0>(conflict) << ":" - << std::get<1>(conflict) << ":" - << std::get<2>(conflict) << ":" - << std::get<3>(conflict) << ":" - << std::get<4>(conflict) << ":" - << std::get<5>(conflict); - handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR); - continue; - } + nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), true, false); env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(batch), [l = std::move(lg), icw = std::move(wrapper), partId, this] (nebula::cpp2::ErrorCode retCode) { diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index b822a4fc1..81d5f9be9 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -167,6 +167,17 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re for (auto& newTag : newTags) { auto tagId = newTag.get_tag_id(); + auto l = std::make_tuple(spaceId_, partId, tagId, vid); + if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) { + if (!env_->verticesML_->try_lock(l)) { + env_->verticesML_->unlockBatch(dummyLock); + LOG(ERROR) << folly::format("The vertex locked : tag {}, vid {}", + tagId, vid); + code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + break; + } + } + dummyLock.emplace_back(std::move(l)); VLOG(3) << "PartitionID: " << partId << ", VertexID: " << vid << ", TagID: " << tagId; @@ -277,7 +288,6 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re * step 3 , Insert new vertex data */ batchHolder->put(std::move(key), std::move(retEnc.value())); - dummyLock.emplace_back(std::make_tuple(spaceId_, partId, tagId, vid)); if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr) { vertexCache_->evict(std::make_pair(vid, tagId)); @@ -295,17 +305,10 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re } auto batch = encodeBatchValue(std::move(batchHolder)->getBatch()); DCHECK(!batch.empty()); - nebula::MemoryLockGuard lg(env_->verticesML_.get(), std::move(dummyLock), true); - if (!lg) { - auto conflict = lg.conflictKey(); - LOG(ERROR) << "vertex conflict " - << std::get<0>(conflict) << ":" - << std::get<1>(conflict) << ":" - << std::get<2>(conflict) << ":" - << std::get<3>(conflict); - handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR); - continue; - } + nebula::MemoryLockGuard lg(env_->verticesML_.get(), + std::move(dummyLock), + true, + false); env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(batch), [l = std::move(lg), icw = std::move(wrapper), partId, this] ( nebula::cpp2::ErrorCode retCode) { diff --git a/src/storage/mutate/DeleteEdgesProcessor.cpp b/src/storage/mutate/DeleteEdgesProcessor.cpp index 89d387bc5..cc077a399 100644 --- a/src/storage/mutate/DeleteEdgesProcessor.cpp +++ b/src/storage/mutate/DeleteEdgesProcessor.cpp @@ -86,13 +86,30 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { std::vector dummyLock; dummyLock.reserve(part.second.size()); + nebula::cpp2::ErrorCode err = nebula::cpp2::ErrorCode::SUCCEEDED; for (const auto& edgeKey : part.second) { - dummyLock.emplace_back(std::make_tuple(spaceId_, - partId, - (*edgeKey.src_ref()).getStr(), - *edgeKey.edge_type_ref(), - *edgeKey.ranking_ref(), - (*edgeKey.dst_ref()).getStr())); + auto l = std::make_tuple(spaceId_, + partId, + (*edgeKey.src_ref()).getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + (*edgeKey.dst_ref()).getStr()); + if (!env_->edgesML_->try_lock(l)) { + env_->edgesML_->unlockBatch(dummyLock); + LOG(ERROR) << folly::format("The edge locked : src {}, " + "type {}, tank {}, dst {}", + (*edgeKey.src_ref()).getStr(), + *edgeKey.edge_type_ref(), + *edgeKey.ranking_ref(), + (*edgeKey.dst_ref()).getStr()); + err = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + break; + } + dummyLock.emplace_back(std::move(l)); + } + if (err != nebula::cpp2::ErrorCode::SUCCEEDED) { + handleAsync(spaceId_, partId, err); + continue; } auto batch = deleteEdges(partId, std::move(part.second)); if (!nebula::ok(batch)) { @@ -100,19 +117,10 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) { continue; } DCHECK(!nebula::value(batch).empty()); - nebula::MemoryLockGuard lg(env_->edgesML_.get(), std::move(dummyLock), true); - if (!lg) { - auto conflict = lg.conflictKey(); - LOG(ERROR) << "edge conflict " - << std::get<0>(conflict) << ":" - << std::get<1>(conflict) << ":" - << std::get<2>(conflict) << ":" - << std::get<3>(conflict) << ":" - << std::get<4>(conflict) << ":" - << std::get<5>(conflict); - handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR); - continue; - } + nebula::MemoryLockGuard lg(env_->edgesML_.get(), + std::move(dummyLock), + false, + false); env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(nebula::value(batch)), [l = std::move(lg), icw = std::move(wrapper), partId, this] ( nebula::cpp2::ErrorCode code) { diff --git a/src/storage/mutate/DeleteVerticesProcessor.cpp b/src/storage/mutate/DeleteVerticesProcessor.cpp index 9990987f3..ed499b3ea 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.cpp +++ b/src/storage/mutate/DeleteVerticesProcessor.cpp @@ -100,17 +100,10 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) { continue; } DCHECK(!nebula::value(batch).empty()); - nebula::MemoryLockGuard lg(env_->verticesML_.get(), std::move(dummyLock), true); - if (!lg) { - auto conflict = lg.conflictKey(); - LOG(ERROR) << "vertex conflict " - << std::get<0>(conflict) << ":" - << std::get<1>(conflict) << ":" - << std::get<2>(conflict) << ":" - << std::get<3>(conflict); - handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR); - continue; - } + nebula::MemoryLockGuard lg(env_->verticesML_.get(), + std::move(dummyLock), + false, + false); env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(nebula::value(batch)), [l = std::move(lg), icw = std::move(wrapper), partId, this] ( nebula::cpp2::ErrorCode code) { @@ -142,6 +135,14 @@ DeleteVerticesProcessor::deleteVertices(PartitionID partId, while (iter->valid()) { auto key = iter->key(); auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, key); + auto l = std::make_tuple(spaceId_, partId, tagId, vertex.getStr()); + if (!env_->verticesML_->try_lock(l)) { + env_->verticesML_->unlockBatch(target); + LOG(ERROR) << folly::format("The vertex locked : tag {}, vid {}", + tagId, vertex.getStr()); + return nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR; + } + target.emplace_back(); RowReaderWrapper reader; for (auto& index : indexes_) { if (index->get_schema_id().get_tag_id() == tagId) { @@ -186,7 +187,6 @@ DeleteVerticesProcessor::deleteVertices(PartitionID partId, VLOG(3) << "Evict vertex cache for vertex ID " << vertex << ", tagId " << tagId; vertexCache_->evict(std::make_pair(vertex.getStr(), tagId)); } - target.emplace_back(std::make_tuple(spaceId_, partId, tagId, vertex.getStr())); batchHolder->remove(key.str()); iter->next(); } diff --git a/src/storage/test/MemoryLockTest.cpp b/src/storage/test/MemoryLockTest.cpp index cb3251356..326df025e 100644 --- a/src/storage/test/MemoryLockTest.cpp +++ b/src/storage/test/MemoryLockTest.cpp @@ -68,6 +68,21 @@ TEST_F(MemoryLockTest, MoveTest) { } } +TEST_F(MemoryLockTest, PrepTest) { + MemoryLockCore mlock; + { + EXPECT_TRUE(mlock.try_lock("1")); + EXPECT_TRUE(mlock.try_lock("2")); + EXPECT_FALSE(mlock.try_lock("1")); + EXPECT_FALSE(mlock.try_lock("2")); + std::vector keys{"1", "2"}; + auto* lk = new LockGuard(&mlock, keys, false, false); + EXPECT_TRUE(lk); + delete lk; + } + EXPECT_EQ(0, mlock.size()); +} + } // namespace storage } // namespace nebula diff --git a/src/utils/MemoryLockWrapper.h b/src/utils/MemoryLockWrapper.h index d18292c16..d95ee8381 100644 --- a/src/utils/MemoryLockWrapper.h +++ b/src/utils/MemoryLockWrapper.h @@ -19,14 +19,21 @@ class MemoryLockGuard { MemoryLockGuard(MemoryLockCore* lock, const Key& key) : MemoryLockGuard(lock, std::vector{key}) {} - MemoryLockGuard(MemoryLockCore* lock, const std::vector& keys, bool dedup = false) + MemoryLockGuard(MemoryLockCore* lock, + const std::vector& keys, + bool dedup = false, + bool prepCheck = true) : lock_(lock), keys_(keys) { - if (dedup) { - std::sort(keys_.begin(), keys_.end()); - auto last = std::unique(keys_.begin(), keys_.end()); - std::tie(iter_, locked_) = lock_->lockBatch(keys_.begin(), last); + if (prepCheck) { + if (dedup) { + std::sort(keys_.begin(), keys_.end()); + auto last = std::unique(keys_.begin(), keys_.end()); + std::tie(iter_, locked_) = lock_->lockBatch(keys_.begin(), last); + } else { + std::tie(iter_, locked_) = lock_->lockBatch(keys_); + } } else { - std::tie(iter_, locked_) = lock_->lockBatch(keys_); + locked_ = true; } }