Skip to content

Commit

Permalink
Memory lock in raft (#3926)
Browse files Browse the repository at this point in the history
* init upload

* type

* address comments: remove some comments

* ??

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
liuyu85cn and Sophie-Xie committed Apr 21, 2022
1 parent f4a2299 commit cc24e82
Show file tree
Hide file tree
Showing 22 changed files with 794 additions and 737 deletions.
1 change: 0 additions & 1 deletion src/kvstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ nebula_add_library(
PartManager.cpp
NebulaStore.cpp
RocksEngineConfig.cpp
LogEncoder.cpp
NebulaSnapshotManager.cpp
RateLimiter.cpp
plugins/elasticsearch/ESListener.cpp
Expand Down
10 changes: 10 additions & 0 deletions src/kvstore/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,16 @@ inline rocksdb::Slice toSlice(const folly::StringPiece& str) {
using KVMap = std::unordered_map<std::string, std::string>;
using KVArrayIterator = std::vector<KV>::const_iterator;

class MergeableAtomicOpResult {
public:
nebula::cpp2::ErrorCode code;
std::string batch; // batched result, like before.
std::list<std::string> readSet;
std::list<std::string> writeSet;
};

using MergeableAtomicOp = folly::Function<MergeableAtomicOpResult(void)>;

} // namespace kvstore
} // namespace nebula

Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ class KVStore {
*/
virtual void asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
MergeableAtomicOp op,
KVCallback cb) = 0;

/**
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ void NebulaStore::asyncRemoveRange(GraphSpaceID spaceId,

void NebulaStore::asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
MergeableAtomicOp op,
KVCallback cb) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ class NebulaStore : public KVStore, public Handler {
*/
void asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
MergeableAtomicOp op,
KVCallback cb) override;

/**
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void Part::sync(KVCallback cb) {
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
}

void Part::asyncAtomicOp(raftex::AtomicOp op, KVCallback cb) {
void Part::asyncAtomicOp(MergeableAtomicOp op, KVCallback cb) {
atomicOpAsync(std::move(op))
.thenValue(
[callback = std::move(cb)](nebula::cpp2::ErrorCode code) mutable { callback(code); });
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class Part : public raftex::RaftPart {
* @param op Atomic operation
* @param cb Callback when has a result
*/
void asyncAtomicOp(raftex::AtomicOp op, KVCallback cb);
void asyncAtomicOp(MergeableAtomicOp op, KVCallback cb);

/**
* @brief Add a raft learner asynchronously by adding raft log
Expand Down
1 change: 1 addition & 0 deletions src/kvstore/raftex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ nebula_add_library(
RaftexService.cpp
Host.cpp
SnapshotManager.cpp
../LogEncoder.cpp
)

nebula_add_subdirectory(test)
Loading

0 comments on commit cc24e82

Please sign in to comment.