Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate and store snapshots on primary node #1500

Merged
merged 19 commits into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,13 @@ if(BUILD_TESTS)
CONSENSUS raft
)

add_e2e_test(
NAME reconfiguration_snapshot_test
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/reconfiguration.py
CONSENSUS raft
ADDITIONAL_ARGS --snapshot-max-tx 10
)

add_e2e_test(
NAME code_update_test
PYTHON_SCRIPT ${CMAKE_SOURCE_DIR}/tests/code_update.py
Expand Down
5 changes: 5 additions & 0 deletions src/consensus/ledger_enclave_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ namespace consensus
DEFINE_RINGBUFFER_MSG_TYPE(ledger_append),
DEFINE_RINGBUFFER_MSG_TYPE(ledger_truncate),
DEFINE_RINGBUFFER_MSG_TYPE(ledger_commit),

/// Create a new snapshot. Enclave -> Host
DEFINE_RINGBUFFER_MSG_TYPE(ledger_snapshot),
};
}

Expand All @@ -47,3 +50,5 @@ DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::ledger_truncate, consensus::Index);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(consensus::ledger_commit, consensus::Index);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::ledger_snapshot, consensus::Index, std::vector<uint8_t>);
2 changes: 1 addition & 1 deletion src/consensus/pbft/pbft.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "enclave/rpc_map.h"
#include "enclave/rpc_sessions.h"
#include "kv/kv_types.h"
#include "node/nodetypes.h"
#include "node/node_types.h"

#include <list>
#include <memory>
Expand Down
51 changes: 36 additions & 15 deletions src/consensus/raft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "ds/serialized.h"
#include "ds/spin_lock.h"
#include "kv/kv_types.h"
#include "node/nodetypes.h"
#include "node/node_types.h"
#include "raft_types.h"

#include <algorithm>
Expand Down Expand Up @@ -71,7 +71,7 @@ namespace raft
}
};

template <class LedgerProxy, class ChannelProxy>
template <class LedgerProxy, class ChannelProxy, class SnapshotterProxy>
class Raft
{
private:
Expand All @@ -84,22 +84,23 @@ namespace raft

struct NodeState
{
// the highest matching index with the node that was confirmed
Index match_idx;
Configuration::NodeInfo node_info;

// the highest index sent to the node
Index sent_idx;

Configuration::NodeInfo node_info;
// the highest matching index with the node that was confirmed
Index match_idx;

NodeState() = default;

NodeState(
Index match_idx_,
const Configuration::NodeInfo& node_info_,
Index sent_idx_,
const Configuration::NodeInfo& node_info_) :
match_idx(match_idx_),
Index match_idx_ = 0) :
node_info(node_info_),
sent_idx(sent_idx_),
node_info(node_info_)
match_idx(match_idx_)
{}
};

Expand All @@ -115,6 +116,8 @@ namespace raft
Index commit_idx;
TermHistory term_history;

Index last_snapshot_idx;

// Volatile
NodeId leader_id;
std::unordered_set<NodeId> votes_for_me;
Expand Down Expand Up @@ -151,12 +154,14 @@ namespace raft
static constexpr size_t append_entries_size_limit = 20000;
std::unique_ptr<LedgerProxy> ledger;
std::shared_ptr<ChannelProxy> channels;
std::shared_ptr<SnapshotterProxy> snapshotter;

public:
Raft(
std::unique_ptr<Store<kv::DeserialiseSuccess>> store,
std::unique_ptr<LedgerProxy> ledger_,
std::shared_ptr<ChannelProxy> channels_,
std::shared_ptr<SnapshotterProxy> snapshotter_,
NodeId id,
std::chrono::milliseconds request_timeout_,
std::chrono::milliseconds election_timeout_,
Expand All @@ -168,6 +173,7 @@ namespace raft
voted_for(NoNode),
last_idx(0),
commit_idx(0),
last_snapshot_idx(0),

leader_id(NoNode),

Expand All @@ -182,7 +188,8 @@ namespace raft
rand((int)(uintptr_t)this),

ledger(std::move(ledger_)),
channels(channels_)
channels(channels_),
snapshotter(snapshotter_)

{}

Expand Down Expand Up @@ -507,12 +514,16 @@ namespace raft

auto& node = nodes.at(to);

// Record the most recent index we have sent to this node.
node.sent_idx = end_idx;

// The host will append log entries to this message when it is
// sent to the destination node.
channels->send_authenticated(ccf::NodeMsgType::consensus_msg, to, ae);
if (!channels->send_authenticated(
jumaffre marked this conversation as resolved.
Show resolved Hide resolved
ccf::NodeMsgType::consensus_msg, to, ae))
{
return;
}

// Record the most recent index we have sent to this node.
node.sent_idx = end_idx;
}

void recv_append_entries(const uint8_t* data, size_t size)
Expand Down Expand Up @@ -1157,8 +1168,17 @@ namespace raft
commit_idx = idx;

LOG_DEBUG_FMT("Compacting...");
if (state == Leader)
{
auto snapshot_idx = snapshotter->snapshot(idx);
if (snapshot_idx.has_value())
{
last_snapshot_idx = snapshot_idx.value();
}
}
store->compact(idx);
ledger->commit(idx);

LOG_DEBUG_FMT("Commit on {}: {}", local_id, idx);

// Examine all configurations that are followed by a globally committed
Expand Down Expand Up @@ -1267,14 +1287,15 @@ namespace raft
// A new node is sent only future entries initially. If it does not
// have prior data, it will communicate that back to the leader.
auto index = last_idx + 1;
nodes.try_emplace(node_info.first, 0, index, node_info.second);
nodes.try_emplace(node_info.first, node_info.second, index, 0);

if (state == Leader)
{
channels->create_channel(
node_info.first,
node_info.second.hostname,
node_info.second.port);

send_append_entries(node_info.first, index);
}

Expand Down
6 changes: 3 additions & 3 deletions src/consensus/raft/raft_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ namespace raft
// the Raft API, allowing for a mapping between the generic consensus
// terminology and the terminology that is specific to Raft

template <class LedgerProxy, class ChannelProxy>
template <class... T>
class RaftConsensus : public kv::Consensus
{
private:
std::unique_ptr<Raft<LedgerProxy, ChannelProxy>> raft;
std::unique_ptr<Raft<T...>> raft;

public:
RaftConsensus(std::unique_ptr<Raft<LedgerProxy, ChannelProxy>> raft_) :
RaftConsensus(std::unique_ptr<Raft<T...>> raft_) :
Consensus(raft_->id()),
raft(std::move(raft_))
{}
Expand Down
14 changes: 10 additions & 4 deletions src/consensus/raft/raft_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ namespace raft
S deserialise(
const std::vector<uint8_t>& data,
bool public_only = false,
Term* term = nullptr)
Term* term = nullptr) override
{
auto p = x.lock();
if (p)
{
return p->deserialise(data, public_only, term);
}
return S::FAILED;
}

void compact(Index v)
void compact(Index v) override
{
auto p = x.lock();
if (p)
Expand All @@ -61,18 +63,22 @@ namespace raft
}
}

void rollback(Index v, std::optional<Term> t = std::nullopt)
void rollback(Index v, std::optional<Term> t = std::nullopt) override
{
auto p = x.lock();
if (p)
{
p->rollback(v, t);
}
}

void set_term(Term t)
void set_term(Term t) override
{
auto p = x.lock();
if (p)
{
p->set_term(t);
}
}
};

Expand Down
4 changes: 3 additions & 1 deletion src/consensus/raft/test/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
#include "logging_stub.h"

using ms = std::chrono::milliseconds;
using TRaft = raft::Raft<raft::LedgerStubProxy, raft::ChannelStubProxy>;
using TRaft = raft::
Raft<raft::LedgerStubProxy, raft::ChannelStubProxy, raft::StubSnapshotter>;
using Store = raft::LoggingStubStore;
using Adaptor = raft::Adaptor<Store, kv::DeserialiseSuccess>;

Expand Down Expand Up @@ -46,6 +47,7 @@ class RaftDriver
std::make_unique<Adaptor>(kv),
std::make_unique<raft::LedgerStubProxy>(node_id),
std::make_shared<raft::ChannelStubProxy>(),
std::make_shared<raft::StubSnapshotter>(),
node_id,
ms(10),
ms(i * 100));
Expand Down
23 changes: 19 additions & 4 deletions src/consensus/raft/test/logging_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "consensus/raft/raft_types.h"

#include <map>
#include <optional>
#include <vector>

namespace raft
Expand Down Expand Up @@ -87,32 +88,36 @@ namespace raft

void close_all_outgoing() {}

void send_authenticated(
bool send_authenticated(
const ccf::NodeMsgType& msg_type, NodeId to, const RequestVote& data)
{
sent_request_vote.push_back(std::make_pair(to, data));
return true;
}

void send_authenticated(
bool send_authenticated(
const ccf::NodeMsgType& msg_type, NodeId to, const AppendEntries& data)
{
sent_append_entries.push_back(std::make_pair(to, data));
return true;
}

void send_authenticated(
bool send_authenticated(
const ccf::NodeMsgType& msg_type,
NodeId to,
const RequestVoteResponse& data)
{
sent_request_vote_response.push_back(std::make_pair(to, data));
return true;
}

void send_authenticated(
bool send_authenticated(
const ccf::NodeMsgType& msg_type,
NodeId to,
const AppendEntriesResponse& data)
{
sent_append_entries_response.push_back(std::make_pair(to, data));
return true;
}

size_t sent_msg_count() const
Expand Down Expand Up @@ -184,4 +189,14 @@ namespace raft
return kv::DeserialiseSuccess::PASS_SIGNATURE;
}
};

class StubSnapshotter
{
public:
std::optional<kv::Version> snapshot(kv::Version version)
{
// For now, do not test snapshots in unit tests
return std::nullopt;
}
};
}
Loading