Skip to content

Commit

Permalink
Implement snapshot logic (#795)
Browse files Browse the repository at this point in the history
* Implement snapshot logic

* Address critical27's and wadeliuyi's comments

* Address darion's and wadeliuyi's comments, rebase on master and fix failed UTs

* Address wadeliuyi's comments

* Fix failed UT leader_transfer_test

* Open disabled UTs in #859

* Add some checks when receiving snapshot and adjust some log levels
  • Loading branch information
dangleptr authored Sep 3, 2019
1 parent b632360 commit 0121cb2
Show file tree
Hide file tree
Showing 43 changed files with 1,037 additions and 167 deletions.
8 changes: 8 additions & 0 deletions src/common/base/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ std::string NebulaKeyUtils::edgeKey(PartitionID partId,
return key;
}

// static
std::string NebulaKeyUtils::prefix(PartitionID partId) {
std::string key;
key.reserve(sizeof(PartitionID));
key.append(reinterpret_cast<const char*>(&partId), sizeof(PartitionID));
return key;
}

// static
std::string NebulaKeyUtils::prefix(PartitionID partId, VertexID srcId, EdgeType type) {
std::string key;
Expand Down
2 changes: 2 additions & 0 deletions src/common/base/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class NebulaKeyUtils final {
static std::string prefix(PartitionID partId, VertexID src, EdgeType type,
EdgeRanking ranking, VertexID dst);

static std::string prefix(PartitionID partId);

static bool isVertex(const folly::StringPiece& rawKey) {
return rawKey.size() == kVertexLen;
}
Expand Down
26 changes: 21 additions & 5 deletions src/interface/raftex.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ enum ErrorCode {
E_LOG_GAP = -1;
E_LOG_STALE = -2;
E_MISSING_COMMIT = -3;
E_PULLING_SNAPSHOT = -4; // The follower is pulling a snapshot
E_WAITING_SNAPSHOT = -4; // The follower is waiting a snapshot

E_UNKNOWN_PART = -5;
E_TERM_OUT_OF_DATE = -6;
Expand All @@ -31,6 +31,7 @@ enum ErrorCode {
E_NOT_A_LEADER = -13;
E_HOST_DISCONNECTED = -14;
E_TOO_MANY_REQUESTS = -15;
E_PERSIST_SNAPSHOT_FAILED = -16;

E_EXCEPTION = -20; // An thrift internal exception was thrown
}
Expand Down Expand Up @@ -94,8 +95,6 @@ struct AppendLogRequest {
//
// Fields 10 to 11 are used for LogAppend.
//
// In the case of heartbeat, the log_str_list will be empty,
// and log_term == 0
//
// In the case of LogAppend, the id of the first log is the
// last_log_id_sent + 1
Expand All @@ -106,7 +105,7 @@ struct AppendLogRequest {
10: TermID log_term;
11: list<LogEntry> log_str_list;

12: optional binary snapshot_uri; // Snapshot URL
12: bool sending_snapshot;
}


Expand All @@ -118,13 +117,30 @@ struct AppendLogResponse {
5: LogID committed_log_id;
6: LogID last_log_id;
7: TermID last_log_term;
8: bool pulling_snapshot;
}

struct SendSnapshotRequest {
1: GraphSpaceID space;
2: PartitionID part;
3: TermID term;
4: LogID committed_log_id;
5: TermID committed_log_term;
6: IPv4 leader_ip;
7: Port leader_port;
8: list<binary> rows;
9: i64 total_size;
10: i64 total_count;
11: bool done;
}

struct SendSnapshotResponse {
1: ErrorCode error_code;
}

service RaftexService {
AskForVoteResponse askForVote(1: AskForVoteRequest req);
AppendLogResponse appendLog(1: AppendLogRequest req);
SendSnapshotResponse sendSnapshot(1: SendSnapshotRequest req);
}


1 change: 1 addition & 0 deletions src/kvstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ nebula_add_library(
NebulaStore.cpp
RocksEngineConfig.cpp
LogEncoder.cpp
SnapshotManagerImpl.cpp
)

add_subdirectory(raftex)
Expand Down
20 changes: 20 additions & 0 deletions src/kvstore/LogEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ namespace kvstore {

constexpr auto kHeadLen = sizeof(int64_t) + 1 + sizeof(uint32_t);

std::string encodeKV(const folly::StringPiece& key,
const folly::StringPiece& val) {
uint32_t ksize = key.size();
uint32_t vsize = val.size();
std::string str;
str.reserve(sizeof(uint32_t) * 2 + ksize + vsize);
str.append(reinterpret_cast<const char*>(&ksize), sizeof(ksize));
str.append(reinterpret_cast<const char*>(&vsize), sizeof(vsize));
str.append(key.data(), ksize);
str.append(val.data(), vsize);
return str;
}

std::pair<folly::StringPiece, folly::StringPiece> decodeKV(const std::string& data) {
auto ksize = *reinterpret_cast<const uint32_t*>(data.data());
auto vsize = *reinterpret_cast<const uint32_t*>(data.data() + sizeof(ksize));
auto key = folly::StringPiece(data.data() + sizeof(ksize) + sizeof(vsize), ksize);
auto val = folly::StringPiece(data.data() + sizeof(ksize) + sizeof(vsize) + ksize, vsize);
return std::make_pair(key, val);
}

std::string encodeSingleValue(LogType type, folly::StringPiece val) {
std::string encoded;
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/LogEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ enum LogType : char {
OP_TRANS_LEADER = 0x08,
};

std::string encodeKV(const folly::StringPiece& key,
const folly::StringPiece& val);

std::pair<folly::StringPiece, folly::StringPiece> decodeKV(const std::string& data);

std::string encodeSingleValue(LogType type, folly::StringPiece val);
folly::StringPiece decodeSingleValue(folly::StringPiece encoded);
Expand Down
7 changes: 5 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "network/NetworkUtils.h"
#include "fs/FileUtils.h"
#include "kvstore/RocksEngine.h"
#include "kvstore/SnapshotManagerImpl.h"

DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
DEFINE_int32(custom_filter_interval_secs, 24 * 3600, "interval to trigger custom compaction");
Expand All @@ -36,7 +37,8 @@ NebulaStore::~NebulaStore() {
bool NebulaStore::init() {
LOG(INFO) << "Start the raft service...";
bgWorkers_ = std::make_shared<thread::GenericThreadPool>();
bgWorkers_->start(FLAGS_num_workers);
bgWorkers_->start(FLAGS_num_workers, "nebula-bgworkers");
snapshot_.reset(new SnapshotManagerImpl(this));
raftService_ = raftex::RaftexService::createService(ioPool_,
workers_,
raftAddr_.second);
Expand Down Expand Up @@ -206,7 +208,8 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
engine,
ioPool_,
bgWorkers_,
workers_);
workers_,
snapshot_);
auto partMeta = options_.partMan_->partMeta(spaceId, partId);
std::vector<HostAddr> peers;
for (auto& h : partMeta.peers_) {
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "kvstore/PartManager.h"
#include "kvstore/Part.h"
#include "kvstore/KVEngine.h"
#include "kvstore/raftex/SnapshotManager.h"

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -200,6 +201,7 @@ class NebulaStore : public KVStore, public Handler {
KVOptions options_;

std::shared_ptr<raftex::RaftexService> raftService_;
std::shared_ptr<raftex::SnapshotManager> snapshot_;
};

} // namespace kvstore
Expand Down
68 changes: 53 additions & 15 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ Part::Part(GraphSpaceID spaceId,
KVEngine* engine,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<thread::GenericThreadPool> workers,
std::shared_ptr<folly::Executor> handlers)
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan)
: RaftPart(FLAGS_cluster_id,
spaceId,
partId,
localAddr,
walPath,
ioPool,
workers,
handlers)
handlers,
snapshotMan)
, spaceId_(spaceId)
, partId_(partId)
, walPath_(walPath)
Expand Down Expand Up @@ -192,7 +194,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
if (batch->put(pieces[0], pieces[1]) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::put()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()";
return false;
}
break;
Expand All @@ -203,7 +205,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
if (batch->put(kvs[i], kvs[i + 1]) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::put()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()";
return false;
}
}
Expand All @@ -212,7 +214,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
case OP_REMOVE: {
auto key = decodeSingleValue(log);
if (batch->remove(key) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::remove()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::remove()";
return false;
}
break;
Expand All @@ -221,7 +223,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
auto keys = decodeMultiValues(log);
for (auto k : keys) {
if (batch->remove(k) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::remove()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::remove()";
return false;
}
}
Expand All @@ -230,7 +232,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
case OP_REMOVE_PREFIX: {
auto prefix = decodeSingleValue(log);
if (batch->removePrefix(prefix) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::removePrefix()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::removePrefix()";
return false;
}
break;
Expand All @@ -239,7 +241,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
auto range = decodeMultiValues(log);
DCHECK_EQ(2, range.size());
if (batch->removeRange(range[0], range[1]) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::removeRange()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::removeRange()";
return false;
}
break;
Expand All @@ -254,24 +256,60 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
break;
}
default: {
LOG(FATAL) << "Unknown operation: " << static_cast<uint8_t>(log[0]);
LOG(FATAL) << idStr_ << "Unknown operation: " << static_cast<uint8_t>(log[0]);
}
}

++(*iter);
}

if (lastId >= 0) {
std::string commitMsg;
commitMsg.reserve(sizeof(LogID) + sizeof(TermID));
commitMsg.append(reinterpret_cast<char*>(&lastId), sizeof(LogID));
commitMsg.append(reinterpret_cast<char*>(&lastTerm), sizeof(TermID));
batch->put(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_), commitMsg);
if (putCommitMsg(batch.get(), lastId, lastTerm) != ResultCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Commit msg failed";
return false;
}
}

return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED;
}

std::pair<int64_t, int64_t> Part::commitSnapshot(const std::vector<std::string>& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
auto batch = engine_->startBatchWrite();
int64_t count = 0;
int64_t size = 0;
for (auto& row : rows) {
count++;
size += row.size();
auto kv = decodeKV(row);
if (ResultCode::SUCCEEDED != batch->put(kv.first, kv.second)) {
LOG(ERROR) << idStr_ << "Put failed in commit";
return std::make_pair(0, 0);
}
}
if (finished) {
if (ResultCode::SUCCEEDED != putCommitMsg(batch.get(), committedLogId, committedLogTerm)) {
LOG(ERROR) << idStr_ << "Put failed in commit";
return std::make_pair(0, 0);
}
}
if (ResultCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch))) {
LOG(ERROR) << idStr_ << "Put failed in commit";
return std::make_pair(0, 0);
}
return std::make_pair(count, size);
}

ResultCode Part::putCommitMsg(WriteBatch* batch, LogID committedLogId, TermID committedLogTerm) {
std::string commitMsg;
commitMsg.reserve(sizeof(LogID) + sizeof(TermID));
commitMsg.append(reinterpret_cast<char*>(&committedLogId), sizeof(LogID));
commitMsg.append(reinterpret_cast<char*>(&committedLogTerm), sizeof(TermID));
return batch->put(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_),
commitMsg);
}

bool Part::preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
Expand Down
16 changes: 15 additions & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
#include "raftex/RaftPart.h"
#include "kvstore/Common.h"
#include "kvstore/KVEngine.h"
#include "kvstore/raftex/SnapshotManager.h"

namespace nebula {
namespace kvstore {


class Part : public raftex::RaftPart {
friend class SnapshotManager;
public:
Part(GraphSpaceID spaceId,
PartitionID partId,
Expand All @@ -25,7 +27,8 @@ class Part : public raftex::RaftPart {
KVEngine* engine,
std::shared_ptr<folly::IOThreadPoolExecutor> pool,
std::shared_ptr<thread::GenericThreadPool> workers,
std::shared_ptr<folly::Executor> handlers);
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan);

virtual ~Part() {
LOG(INFO) << idStr_ << "~Part()";
Expand Down Expand Up @@ -78,6 +81,17 @@ class Part : public raftex::RaftPart {
ClusterID clusterId,
const std::string& log) override;

std::pair<int64_t, int64_t> commitSnapshot(const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;

ResultCode putCommitMsg(WriteBatch* batch, LogID committedLogId, TermID committedLogTerm);

void cleanup() override {
LOG(INFO) << idStr_ << "Clean up all data, not implement!";
}

protected:
GraphSpaceID spaceId_;
PartitionID partId_;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class MemPartManager final : public PartManager {
FRIEND_TEST(NebulaStoreTest, SimpleTest);
FRIEND_TEST(NebulaStoreTest, PartsTest);
FRIEND_TEST(NebulaStoreTest, ThreeCopiesTest);
FRIEND_TEST(NebulaStoreTest, DISABLED_TransLeaderTest);
FRIEND_TEST(NebulaStoreTest, TransLeaderTest);

public:
MemPartManager() = default;
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class RocksWriteBatch : public WriteBatch {
rocksdb::DB* db_{nullptr};

public:
explicit RocksWriteBatch(rocksdb::DB* db) : db_(db) {}
explicit RocksWriteBatch(rocksdb::DB* db) : batch_(FLAGS_rocksdb_batch_size), db_(db) {}

virtual ~RocksWriteBatch() = default;

Expand Down Expand Up @@ -116,7 +116,7 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,
options.compaction_filter_factory = cfFactory;
}
status = rocksdb::DB::Open(options, path, &db);
CHECK(status.ok());
CHECK(status.ok()) << status.ToString();
db_.reset(db);
partsNum_ = allParts().size();
}
Expand Down
Loading

0 comments on commit 0121cb2

Please sign in to comment.