Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement snapshot logic #795

Merged
merged 9 commits into from
Sep 3, 2019
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/common/base/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ std::string NebulaKeyUtils::edgeKey(PartitionID partId,
return key;
}

// static
std::string NebulaKeyUtils::prefix(PartitionID partId) {
std::string key;
key.reserve(sizeof(PartitionID));
key.append(reinterpret_cast<const char*>(&partId), sizeof(PartitionID));
return key;
}

// static
std::string NebulaKeyUtils::prefix(PartitionID partId, VertexID srcId, EdgeType type) {
std::string key;
Expand Down
2 changes: 2 additions & 0 deletions src/common/base/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class NebulaKeyUtils final {
static std::string prefix(PartitionID partId, VertexID src, EdgeType type,
EdgeRanking ranking, VertexID dst);

static std::string prefix(PartitionID partId);

static bool isVertex(const folly::StringPiece& rawKey) {
return rawKey.size() == kVertexLen;
}
Expand Down
26 changes: 21 additions & 5 deletions src/interface/raftex.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ enum ErrorCode {
E_LOG_GAP = -1;
E_LOG_STALE = -2;
E_MISSING_COMMIT = -3;
E_PULLING_SNAPSHOT = -4; // The follower is pulling a snapshot
E_WAITING_SNAPSHOT = -4; // The follower is waiting a snapshot

E_UNKNOWN_PART = -5;
E_TERM_OUT_OF_DATE = -6;
Expand All @@ -31,6 +31,7 @@ enum ErrorCode {
E_NOT_A_LEADER = -13;
E_HOST_DISCONNECTED = -14;
E_TOO_MANY_REQUESTS = -15;
E_PERSIST_SNAPSHOT_FAILED = -16;

E_EXCEPTION = -20; // An thrift internal exception was thrown
}
Expand Down Expand Up @@ -94,8 +95,6 @@ struct AppendLogRequest {
//
// Fields 10 to 11 are used for LogAppend.
//
// In the case of heartbeat, the log_str_list will be empty,
// and log_term == 0
//
// In the case of LogAppend, the id of the first log is the
// last_log_id_sent + 1
Expand All @@ -106,7 +105,7 @@ struct AppendLogRequest {
10: TermID log_term;
11: list<LogEntry> log_str_list;

12: optional binary snapshot_uri; // Snapshot URL
12: bool sending_snapshot;
}


Expand All @@ -118,13 +117,30 @@ struct AppendLogResponse {
5: LogID committed_log_id;
6: LogID last_log_id;
7: TermID last_log_term;
8: bool pulling_snapshot;
}

struct SendSnapshotRequest {
1: GraphSpaceID space;
2: PartitionID part;
3: TermID term;
4: LogID committed_log_id;
5: TermID committed_log_term;
6: IPv4 leader_ip;
7: Port leader_port;
8: list<binary> rows;
9: i64 total_size;
10: i64 total_count;
11: bool done;
}

struct SendSnapshotResponse {
1: ErrorCode error_code;
}

service RaftexService {
AskForVoteResponse askForVote(1: AskForVoteRequest req);
AppendLogResponse appendLog(1: AppendLogRequest req);
SendSnapshotResponse sendSnapshot(1: SendSnapshotRequest req);
}


1 change: 1 addition & 0 deletions src/kvstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ nebula_add_library(
NebulaStore.cpp
RocksEngineConfig.cpp
LogEncoder.cpp
SnapshotManagerImpl.cpp
)

add_subdirectory(raftex)
Expand Down
20 changes: 20 additions & 0 deletions src/kvstore/LogEncoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ namespace kvstore {

constexpr auto kHeadLen = sizeof(int64_t) + 1 + sizeof(uint32_t);

std::string encodeKV(const folly::StringPiece& key,
const folly::StringPiece& val) {
uint32_t ksize = key.size();
uint32_t vsize = val.size();
std::string str;
str.reserve(sizeof(uint32_t) * 2 + ksize + vsize);
str.append(reinterpret_cast<const char*>(&ksize), sizeof(ksize));
str.append(reinterpret_cast<const char*>(&vsize), sizeof(vsize));
str.append(key.data(), ksize);
str.append(val.data(), vsize);
return str;
}

std::pair<folly::StringPiece, folly::StringPiece> decodeKV(const std::string& data) {
auto ksize = *reinterpret_cast<const uint32_t*>(data.data());
auto vsize = *reinterpret_cast<const uint32_t*>(data.data() + sizeof(ksize));
auto key = folly::StringPiece(data.data() + sizeof(ksize) + sizeof(vsize), ksize);
auto val = folly::StringPiece(data.data() + sizeof(ksize) + sizeof(vsize) + ksize, vsize);
return std::make_pair(key, val);
}

std::string encodeSingleValue(LogType type, folly::StringPiece val) {
std::string encoded;
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/LogEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ enum LogType : char {
OP_TRANS_LEADER = 0x08,
};

std::string encodeKV(const folly::StringPiece& key,
const folly::StringPiece& val);

std::pair<folly::StringPiece, folly::StringPiece> decodeKV(const std::string& data);

std::string encodeSingleValue(LogType type, folly::StringPiece val);
folly::StringPiece decodeSingleValue(folly::StringPiece encoded);
Expand Down
7 changes: 5 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "network/NetworkUtils.h"
#include "fs/FileUtils.h"
#include "kvstore/RocksEngine.h"
#include "kvstore/SnapshotManagerImpl.h"

DEFINE_string(engine_type, "rocksdb", "rocksdb, memory...");
DEFINE_int32(custom_filter_interval_secs, 24 * 3600, "interval to trigger custom compaction");
Expand All @@ -36,7 +37,8 @@ NebulaStore::~NebulaStore() {
bool NebulaStore::init() {
LOG(INFO) << "Start the raft service...";
bgWorkers_ = std::make_shared<thread::GenericThreadPool>();
bgWorkers_->start(FLAGS_num_workers);
bgWorkers_->start(FLAGS_num_workers, "nebula-bgworkers");
snapshot_.reset(new SnapshotManagerImpl(this));
raftService_ = raftex::RaftexService::createService(ioPool_,
workers_,
raftAddr_.second);
Expand Down Expand Up @@ -206,7 +208,8 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
engine,
ioPool_,
bgWorkers_,
workers_);
workers_,
snapshot_);
auto partMeta = options_.partMan_->partMeta(spaceId, partId);
std::vector<HostAddr> peers;
for (auto& h : partMeta.peers_) {
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "kvstore/PartManager.h"
#include "kvstore/Part.h"
#include "kvstore/KVEngine.h"
#include "kvstore/raftex/SnapshotManager.h"

namespace nebula {
namespace kvstore {
Expand Down Expand Up @@ -200,6 +201,7 @@ class NebulaStore : public KVStore, public Handler {
KVOptions options_;

std::shared_ptr<raftex::RaftexService> raftService_;
std::shared_ptr<raftex::SnapshotManager> snapshot_;
};

} // namespace kvstore
Expand Down
68 changes: 53 additions & 15 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ Part::Part(GraphSpaceID spaceId,
KVEngine* engine,
std::shared_ptr<folly::IOThreadPoolExecutor> ioPool,
std::shared_ptr<thread::GenericThreadPool> workers,
std::shared_ptr<folly::Executor> handlers)
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan)
dangleptr marked this conversation as resolved.
Show resolved Hide resolved
: RaftPart(FLAGS_cluster_id,
spaceId,
partId,
localAddr,
walPath,
ioPool,
workers,
handlers)
handlers,
snapshotMan)
, spaceId_(spaceId)
, partId_(partId)
, walPath_(walPath)
Expand Down Expand Up @@ -192,7 +194,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
auto pieces = decodeMultiValues(log);
DCHECK_EQ(2, pieces.size());
if (batch->put(pieces[0], pieces[1]) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::put()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()";
return false;
}
break;
Expand All @@ -203,7 +205,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
if (batch->put(kvs[i], kvs[i + 1]) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::put()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()";
return false;
}
}
Expand All @@ -212,7 +214,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
case OP_REMOVE: {
auto key = decodeSingleValue(log);
if (batch->remove(key) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::remove()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::remove()";
return false;
}
break;
Expand All @@ -221,7 +223,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
auto keys = decodeMultiValues(log);
for (auto k : keys) {
if (batch->remove(k) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::remove()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::remove()";
return false;
}
}
Expand All @@ -230,7 +232,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
case OP_REMOVE_PREFIX: {
auto prefix = decodeSingleValue(log);
if (batch->removePrefix(prefix) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::removePrefix()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::removePrefix()";
return false;
}
break;
Expand All @@ -239,7 +241,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
auto range = decodeMultiValues(log);
DCHECK_EQ(2, range.size());
if (batch->removeRange(range[0], range[1]) != ResultCode::SUCCEEDED) {
LOG(ERROR) << "Failed to call WriteBatch::removeRange()";
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::removeRange()";
return false;
}
break;
Expand All @@ -254,24 +256,60 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
break;
}
default: {
LOG(FATAL) << "Unknown operation: " << static_cast<uint8_t>(log[0]);
LOG(FATAL) << idStr_ << "Unknown operation: " << static_cast<uint8_t>(log[0]);
}
}

++(*iter);
}

if (lastId >= 0) {
std::string commitMsg;
commitMsg.reserve(sizeof(LogID) + sizeof(TermID));
commitMsg.append(reinterpret_cast<char*>(&lastId), sizeof(LogID));
commitMsg.append(reinterpret_cast<char*>(&lastTerm), sizeof(TermID));
batch->put(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_), commitMsg);
if (putCommitMsg(batch.get(), lastId, lastTerm) != ResultCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Commit msg failed";
return false;
}
}

return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED;
}

std::pair<int64_t, int64_t> Part::commitSnapshot(const std::vector<std::string>& rows,
LogID committedLogId,
TermID committedLogTerm,
bool finished) {
auto batch = engine_->startBatchWrite();
dangleptr marked this conversation as resolved.
Show resolved Hide resolved
int64_t count = 0;
int64_t size = 0;
for (auto& row : rows) {
count++;
size += row.size();
auto kv = decodeKV(row);
if (ResultCode::SUCCEEDED != batch->put(kv.first, kv.second)) {
LOG(ERROR) << idStr_ << "Put failed in commit";
return std::make_pair(0, 0);
}
}
if (finished) {
if (ResultCode::SUCCEEDED != putCommitMsg(batch.get(), committedLogId, committedLogTerm)) {
LOG(ERROR) << idStr_ << "Put failed in commit";
return std::make_pair(0, 0);
}
}
if (ResultCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch))) {
LOG(ERROR) << idStr_ << "Put failed in commit";
return std::make_pair(0, 0);
}
dangleptr marked this conversation as resolved.
Show resolved Hide resolved
return std::make_pair(count, size);
}

ResultCode Part::putCommitMsg(WriteBatch* batch, LogID committedLogId, TermID committedLogTerm) {
std::string commitMsg;
commitMsg.reserve(sizeof(LogID) + sizeof(TermID));
commitMsg.append(reinterpret_cast<char*>(&committedLogId), sizeof(LogID));
commitMsg.append(reinterpret_cast<char*>(&committedLogTerm), sizeof(TermID));
return batch->put(folly::stringPrintf("%s%d", kCommitKeyPrefix, partId_),
commitMsg);
}

bool Part::preProcessLog(LogID logId,
TermID termId,
ClusterID clusterId,
Expand Down
16 changes: 15 additions & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
#include "raftex/RaftPart.h"
#include "kvstore/Common.h"
#include "kvstore/KVEngine.h"
#include "kvstore/raftex/SnapshotManager.h"

namespace nebula {
namespace kvstore {


class Part : public raftex::RaftPart {
friend class SnapshotManager;
public:
Part(GraphSpaceID spaceId,
PartitionID partId,
Expand All @@ -25,7 +27,8 @@ class Part : public raftex::RaftPart {
KVEngine* engine,
std::shared_ptr<folly::IOThreadPoolExecutor> pool,
std::shared_ptr<thread::GenericThreadPool> workers,
std::shared_ptr<folly::Executor> handlers);
std::shared_ptr<folly::Executor> handlers,
std::shared_ptr<raftex::SnapshotManager> snapshotMan);

virtual ~Part() {
LOG(INFO) << idStr_ << "~Part()";
Expand Down Expand Up @@ -78,6 +81,17 @@ class Part : public raftex::RaftPart {
ClusterID clusterId,
const std::string& log) override;

std::pair<int64_t, int64_t> commitSnapshot(const std::vector<std::string>& data,
LogID committedLogId,
TermID committedLogTerm,
bool finished) override;

ResultCode putCommitMsg(WriteBatch* batch, LogID committedLogId, TermID committedLogTerm);

void cleanup() override {
LOG(INFO) << idStr_ << "Clean up all data, not implement!";
}

protected:
GraphSpaceID spaceId_;
PartitionID partId_;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/PartManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class MemPartManager final : public PartManager {
FRIEND_TEST(NebulaStoreTest, SimpleTest);
FRIEND_TEST(NebulaStoreTest, PartsTest);
FRIEND_TEST(NebulaStoreTest, ThreeCopiesTest);
FRIEND_TEST(NebulaStoreTest, DISABLED_TransLeaderTest);
FRIEND_TEST(NebulaStoreTest, TransLeaderTest);

public:
MemPartManager() = default;
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class RocksWriteBatch : public WriteBatch {
rocksdb::DB* db_{nullptr};

public:
explicit RocksWriteBatch(rocksdb::DB* db) : db_(db) {}
explicit RocksWriteBatch(rocksdb::DB* db) : batch_(FLAGS_rocksdb_batch_size), db_(db) {}

virtual ~RocksWriteBatch() = default;

Expand Down Expand Up @@ -116,7 +116,7 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,
options.compaction_filter_factory = cfFactory;
}
status = rocksdb::DB::Open(options, path, &db);
CHECK(status.ok());
CHECK(status.ok()) << status.ToString();
db_.reset(db);
partsNum_ = allParts().size();
}
Expand Down
Loading