Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
done
Browse files Browse the repository at this point in the history
  • Loading branch information
bright-starry-sky committed Jul 6, 2021
1 parent 977a9cb commit a62e896
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 68 deletions.
39 changes: 20 additions & 19 deletions src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,25 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
visited.reserve(newEdges.size());
for (auto& newEdge : newEdges) {
auto edgeKey = *newEdge.key_ref();
auto l = std::make_tuple(spaceId_,
partId,
(*edgeKey.src_ref()).getStr(),
*edgeKey.edge_type_ref(),
*edgeKey.ranking_ref(),
(*edgeKey.dst_ref()).getStr());
if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) {
if (!env_->edgesML_->try_lock(l)) {
env_->edgesML_->unlockBatch(dummyLock);
LOG(ERROR) << folly::format("edge locked : src {}, type {}, rank {}, dst {}",
(*edgeKey.src_ref()).getStr(),
*edgeKey.edge_type_ref(),
*edgeKey.ranking_ref(),
(*edgeKey.dst_ref()).getStr());
code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR;
break;
}
}
dummyLock.emplace_back(std::move(l));
VLOG(3) << "PartitionID: " << partId << ", VertexID: " << *edgeKey.src_ref()
<< ", EdgeType: " << *edgeKey.edge_type_ref() << ", EdgeRanking: "
<< *edgeKey.ranking_ref() << ", VertexID: "
Expand Down Expand Up @@ -274,32 +293,14 @@ void AddEdgesProcessor::doProcessWithIndex(const cpp2::AddEdgesRequest& req) {
break;
}
batchHolder->put(std::move(key), std::move(retEnc.value()));
dummyLock.emplace_back(std::make_tuple(spaceId_,
partId,
(*edgeKey.src_ref()).getStr(),
*edgeKey.edge_type_ref(),
*edgeKey.ranking_ref(),
(*edgeKey.dst_ref()).getStr()));
}
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleAsync(spaceId_, partId, code);
continue;
}
auto batch = encodeBatchValue(std::move(batchHolder)->getBatch());
DCHECK(!batch.empty());
nebula::MemoryLockGuard<EMLI> lg(env_->edgesML_.get(), std::move(dummyLock), true);
if (!lg) {
auto conflict = lg.conflictKey();
LOG(ERROR) << "edge conflict "
<< std::get<0>(conflict) << ":"
<< std::get<1>(conflict) << ":"
<< std::get<2>(conflict) << ":"
<< std::get<3>(conflict) << ":"
<< std::get<4>(conflict) << ":"
<< std::get<5>(conflict);
handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR);
continue;
}
nebula::MemoryLockGuard<EMLI> lg(env_->edgesML_.get(), std::move(dummyLock), true, false);
env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(batch),
[l = std::move(lg), icw = std::move(wrapper), partId, this]
(nebula::cpp2::ErrorCode retCode) {
Expand Down
27 changes: 15 additions & 12 deletions src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re

for (auto& newTag : newTags) {
auto tagId = newTag.get_tag_id();
auto l = std::make_tuple(spaceId_, partId, tagId, vid);
if (std::find(dummyLock.begin(), dummyLock.end(), l) == dummyLock.end()) {
if (!env_->verticesML_->try_lock(l)) {
env_->verticesML_->unlockBatch(dummyLock);
LOG(ERROR) << folly::format("The vertex locked : tag {}, vid {}",
tagId, vid);
code = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR;
break;
}
}
dummyLock.emplace_back(std::move(l));
VLOG(3) << "PartitionID: " << partId << ", VertexID: " << vid
<< ", TagID: " << tagId;

Expand Down Expand Up @@ -277,7 +288,6 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
* step 3 , Insert new vertex data
*/
batchHolder->put(std::move(key), std::move(retEnc.value()));
dummyLock.emplace_back(std::make_tuple(spaceId_, partId, tagId, vid));

if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr) {
vertexCache_->evict(std::make_pair(vid, tagId));
Expand All @@ -295,17 +305,10 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re
}
auto batch = encodeBatchValue(std::move(batchHolder)->getBatch());
DCHECK(!batch.empty());
nebula::MemoryLockGuard<VMLI> lg(env_->verticesML_.get(), std::move(dummyLock), true);
if (!lg) {
auto conflict = lg.conflictKey();
LOG(ERROR) << "vertex conflict "
<< std::get<0>(conflict) << ":"
<< std::get<1>(conflict) << ":"
<< std::get<2>(conflict) << ":"
<< std::get<3>(conflict);
handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR);
continue;
}
nebula::MemoryLockGuard<VMLI> lg(env_->verticesML_.get(),
std::move(dummyLock),
true,
false);
env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(batch),
[l = std::move(lg), icw = std::move(wrapper), partId, this] (
nebula::cpp2::ErrorCode retCode) {
Expand Down
46 changes: 27 additions & 19 deletions src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,33 +86,41 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
std::vector<EMLI> dummyLock;
dummyLock.reserve(part.second.size());

nebula::cpp2::ErrorCode err = nebula::cpp2::ErrorCode::SUCCEEDED;
for (const auto& edgeKey : part.second) {
dummyLock.emplace_back(std::make_tuple(spaceId_,
partId,
(*edgeKey.src_ref()).getStr(),
*edgeKey.edge_type_ref(),
*edgeKey.ranking_ref(),
(*edgeKey.dst_ref()).getStr()));
auto l = std::make_tuple(spaceId_,
partId,
(*edgeKey.src_ref()).getStr(),
*edgeKey.edge_type_ref(),
*edgeKey.ranking_ref(),
(*edgeKey.dst_ref()).getStr());
if (!env_->edgesML_->try_lock(l)) {
env_->edgesML_->unlockBatch(dummyLock);
LOG(ERROR) << folly::format("The edge locked : src {}, "
"type {}, tank {}, dst {}",
(*edgeKey.src_ref()).getStr(),
*edgeKey.edge_type_ref(),
*edgeKey.ranking_ref(),
(*edgeKey.dst_ref()).getStr());
err = nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR;
break;
}
dummyLock.emplace_back(std::move(l));
}
if (err != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleAsync(spaceId_, partId, err);
continue;
}
auto batch = deleteEdges(partId, std::move(part.second));
if (!nebula::ok(batch)) {
handleAsync(spaceId_, partId, nebula::error(batch));
continue;
}
DCHECK(!nebula::value(batch).empty());
nebula::MemoryLockGuard<EMLI> lg(env_->edgesML_.get(), std::move(dummyLock), true);
if (!lg) {
auto conflict = lg.conflictKey();
LOG(ERROR) << "edge conflict "
<< std::get<0>(conflict) << ":"
<< std::get<1>(conflict) << ":"
<< std::get<2>(conflict) << ":"
<< std::get<3>(conflict) << ":"
<< std::get<4>(conflict) << ":"
<< std::get<5>(conflict);
handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR);
continue;
}
nebula::MemoryLockGuard<EMLI> lg(env_->edgesML_.get(),
std::move(dummyLock),
false,
false);
env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(nebula::value(batch)),
[l = std::move(lg), icw = std::move(wrapper), partId, this] (
nebula::cpp2::ErrorCode code) {
Expand Down
24 changes: 12 additions & 12 deletions src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,10 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
continue;
}
DCHECK(!nebula::value(batch).empty());
nebula::MemoryLockGuard<VMLI> lg(env_->verticesML_.get(), std::move(dummyLock), true);
if (!lg) {
auto conflict = lg.conflictKey();
LOG(ERROR) << "vertex conflict "
<< std::get<0>(conflict) << ":"
<< std::get<1>(conflict) << ":"
<< std::get<2>(conflict) << ":"
<< std::get<3>(conflict);
handleAsync(spaceId_, partId, nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR);
continue;
}
nebula::MemoryLockGuard<VMLI> lg(env_->verticesML_.get(),
std::move(dummyLock),
false,
false);
env_->kvstore_->asyncAppendBatch(spaceId_, partId, std::move(nebula::value(batch)),
[l = std::move(lg), icw = std::move(wrapper), partId, this] (
nebula::cpp2::ErrorCode code) {
Expand Down Expand Up @@ -142,6 +135,14 @@ DeleteVerticesProcessor::deleteVertices(PartitionID partId,
while (iter->valid()) {
auto key = iter->key();
auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, key);
auto l = std::make_tuple(spaceId_, partId, tagId, vertex.getStr());
if (!env_->verticesML_->try_lock(l)) {
env_->verticesML_->unlockBatch(target);
LOG(ERROR) << folly::format("The vertex locked : tag {}, vid {}",
tagId, vertex.getStr());
return nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR;
}
target.emplace_back();
RowReaderWrapper reader;
for (auto& index : indexes_) {
if (index->get_schema_id().get_tag_id() == tagId) {
Expand Down Expand Up @@ -186,7 +187,6 @@ DeleteVerticesProcessor::deleteVertices(PartitionID partId,
VLOG(3) << "Evict vertex cache for vertex ID " << vertex << ", tagId " << tagId;
vertexCache_->evict(std::make_pair(vertex.getStr(), tagId));
}
target.emplace_back(std::make_tuple(spaceId_, partId, tagId, vertex.getStr()));
batchHolder->remove(key.str());
iter->next();
}
Expand Down
15 changes: 15 additions & 0 deletions src/storage/test/MemoryLockTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ TEST_F(MemoryLockTest, MoveTest) {
}
}

TEST_F(MemoryLockTest, PrepTest) {
MemoryLockCore<std::string> mlock;
{
EXPECT_TRUE(mlock.try_lock("1"));
EXPECT_TRUE(mlock.try_lock("2"));
EXPECT_FALSE(mlock.try_lock("1"));
EXPECT_FALSE(mlock.try_lock("2"));
std::vector<std::string> keys{"1", "2"};
auto* lk = new LockGuard(&mlock, keys, false, false);
EXPECT_TRUE(lk);
delete lk;
}
EXPECT_EQ(0, mlock.size());
}

} // namespace storage
} // namespace nebula

Expand Down
19 changes: 13 additions & 6 deletions src/utils/MemoryLockWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@ class MemoryLockGuard {
MemoryLockGuard(MemoryLockCore<Key>* lock, const Key& key)
: MemoryLockGuard(lock, std::vector<Key>{key}) {}

MemoryLockGuard(MemoryLockCore<Key>* lock, const std::vector<Key>& keys, bool dedup = false)
MemoryLockGuard(MemoryLockCore<Key>* lock,
const std::vector<Key>& keys,
bool dedup = false,
bool prepCheck = true)
: lock_(lock), keys_(keys) {
if (dedup) {
std::sort(keys_.begin(), keys_.end());
auto last = std::unique(keys_.begin(), keys_.end());
std::tie(iter_, locked_) = lock_->lockBatch(keys_.begin(), last);
if (prepCheck) {
if (dedup) {
std::sort(keys_.begin(), keys_.end());
auto last = std::unique(keys_.begin(), keys_.end());
std::tie(iter_, locked_) = lock_->lockBatch(keys_.begin(), last);
} else {
std::tie(iter_, locked_) = lock_->lockBatch(keys_);
}
} else {
std::tie(iter_, locked_) = lock_->lockBatch(keys_);
locked_ = true;
}
}

Expand Down

0 comments on commit a62e896

Please sign in to comment.