Skip to content

Commit

Permalink
Fix the read performance issue after delete range with rocksdb-5.15.10 (
Browse files Browse the repository at this point in the history
#1973)

* Fix the read performace issue after delete range with rocksdb-5.15.10

* Fix the move issue

* fix the move issue

* Rebase and address bright-starry-sky's comments

Co-authored-by: heng <heng.chen@vesoft.com>
  • Loading branch information
dangleptr and heng authored Mar 26, 2020
1 parent 82e4b48 commit 1c1aae5
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 24 deletions.
3 changes: 0 additions & 3 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ class BaseProcessor {

void doRemove(GraphSpaceID spaceId, PartitionID partId, std::vector<std::string> keys);

void doRemoveRange(GraphSpaceID spaceId, PartitionID partId, std::string start,
std::string end);

kvstore::ResultCode doRange(GraphSpaceID spaceId, PartitionID partId, std::string start,
std::string end, std::unique_ptr<kvstore::KVIterator>* iter);

Expand Down
11 changes: 0 additions & 11 deletions src/storage/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,6 @@ void BaseProcessor<RESP>::doRemove(GraphSpaceID spaceId,
});
}

template <typename RESP>
void BaseProcessor<RESP>::doRemoveRange(GraphSpaceID spaceId,
PartitionID partId,
std::string start,
std::string end) {
this->kvstore_->asyncRemoveRange(
spaceId, partId, start, end, [spaceId, partId, this](kvstore::ResultCode code) {
handleAsync(spaceId, partId, code);
});
}

template<typename RESP>
kvstore::ResultCode BaseProcessor<RESP>::doRange(GraphSpaceID spaceId,
PartitionID partId,
Expand Down
28 changes: 22 additions & 6 deletions src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
std::for_each(req.parts.begin(), req.parts.end(), [this](auto &partEdges) {
this->callingNum_ += partEdges.second.size();
});
std::for_each(req.parts.begin(), req.parts.end(), [spaceId, this](auto &partEdges) {
std::vector<std::string> keys;
keys.reserve(16);
for (auto& partEdges : req.parts) {
auto partId = partEdges.first;
std::for_each(partEdges.second.begin(), partEdges.second.end(),
[spaceId, partId, this](auto &edgeKey) {
for (auto& edgeKey : partEdges.second) {
auto start = NebulaKeyUtils::edgeKey(partId,
edgeKey.src,
edgeKey.edge_type,
Expand All @@ -39,9 +40,24 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
edgeKey.ranking,
edgeKey.dst,
std::numeric_limits<int64_t>::max());
this->doRemoveRange(spaceId, partId, start, end);
});
});
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = this->kvstore_->range(spaceId, partId, start, end, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
VLOG(3) << "Error! ret = " << static_cast<int32_t>(ret)
<< ", spaceID " << spaceId;
this->handleErrorCode(ret, spaceId, partId);
this->onFinished();
return;
}
keys.clear();
while (iter && iter->valid()) {
auto key = iter->key();
keys.emplace_back(key.data(), key.size());
iter->next();
}
doRemove(spaceId, partId, keys);
}
}
} else {
callingNum_ = req.parts.size();
std::for_each(req.parts.begin(), req.parts.end(), [spaceId, this](auto &partEdges) {
Expand Down
9 changes: 5 additions & 4 deletions src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
this->callingNum_ += pv.second.size();
});

std::vector<std::string> keys;
keys.reserve(32);
for (auto pv = partVertices.begin(); pv != partVertices.end(); pv++) {
auto part = pv->first;
const auto& vertices = pv->second;
Expand All @@ -35,12 +37,11 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
if (ret != kvstore::ResultCode::SUCCEEDED) {
VLOG(3) << "Error! ret = " << static_cast<int32_t>(ret)
<< ", spaceID " << spaceId;
this->handleErrorCode(ret, spaceId, part);
this->onFinished();
return;
}

std::vector<std::string> keys;
keys.reserve(32);
keys.clear();
while (iter->valid()) {
auto key = iter->key();
if (NebulaKeyUtils::isVertex(key)) {
Expand All @@ -54,7 +55,7 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
}
iter->next();
}
doRemove(spaceId, part, std::move(keys));
doRemove(spaceId, part, keys);
}
}
} else {
Expand Down

0 comments on commit 1c1aae5

Please sign in to comment.