Skip to content

Commit

Permalink
Update snapshot commit rules (#1972)
Browse files Browse the repository at this point in the history
  • Loading branch information
jumaffre authored Dec 3, 2020
1 parent 6b32335 commit 489e511
Show file tree
Hide file tree
Showing 15 changed files with 390 additions and 142 deletions.
8 changes: 4 additions & 4 deletions doc/operators/ledger_snapshot.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ Snapshots are generated at regular intervals by the current primary node and sto

To guarantee that the identity of the primary node that generated the snapshot can be verified offline, the SHA-256 digest of the snapshot (i.e. evidence) is recorded in the ``public:ccf.gov.snapshot_evidence`` table. The snapshot evidence will be signed by the primary node on the next signature transaction (see :ref:`operators/start_network:Signature Interval`).

Committed snapshot files are named ``snapshot_<seqno>.commited_<evidence_seqno>``, with ``<seqno>`` the sequence number of the state of the key-value at which they were generated and ``<evidence_seqno>`` the sequence number at which the snapshot evidence was recorded.
Committed snapshot files are named ``snapshot_<seqno>_<evidence_seqno>.commited_<evidence_commit_seqno>``, with ``<seqno>`` the sequence number of the state of the key-value store at which they were generated, ``<evidence_seqno>`` the sequence number at which the snapshot evidence was recorded and ``<evidence_commit_seqno>`` the sequence number at which the snapsot evidence was committed.

Uncommitted snapshot files, i.e. those whose evidence has not yet been committed, are named ``snapshot_<seqno>``. These files will be ignored by CCF when joining or recovering a service as no evidence can attest of their validity.
Uncommitted snapshot files, i.e. those whose evidence has not yet been committed, are named ``snapshot_<seqno>_<evidence_seqno>``. These files will be ignored by CCF when joining or recovering a service as no evidence can attest of their validity.

Join/Recover From Snapshot
~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -69,9 +69,9 @@ Once a snapshot has been generated by the primary, operators can copy or mount t

To validate the snapshot a node is added from, the node first replays the transactions in the ledger following the snapshot until the proof that the snapshot was committed by the service to join is found. This process requires operators to copy the ledger suffix to the node's ledger directory. The validation procedure is generally quick and the node will automatically join the service one the snapshot has been validated. On recovery, the snapshot is automatically verified as part of the usual ledger recovery procedure.

For example, if a node is added using the ``snapshot_1000.committed_1250`` snapshot file, operators should copy the ledger files containing the sequence numbers ``1000`` to ``1250`` to the directories specified by ``--ledger-dir`` (or ``--read-only ledger-dir``). This would involve copying the ledger files following the snapshot sequence number ``1000`` until the evidence sequence number ``1250``, e.g. ``ledger_1001-1200.committed`` and ``ledger_1201-1500.committed``, to the joining node's ledger directory.
For example, if a node is added using the ``snapshot_1000_1250.committed_1300`` snapshot file, operators should copy the ledger files containing all the sequence numbers between ``1000`` to ``1300`` to the directories specified by ``--ledger-dir`` (or ``--read-only ledger-dir``). This would involve copying the ledger files following the snapshot sequence number ``1000`` until the evidence commit sequence number ``1300``, e.g. ``ledger_1001-1200.committed`` and ``ledger_1201-1500.committed``, to the joining node's ledger directory.

.. note:: If the snapshot to join/recover from is recent, it is likely that the evidence for that snapshot is included in the latest `uncommitted` ledger file. In this case, the corresponding ledger file should be copied to the node's main ledger directory (as specified by ``--ledger-dir``) before start-up.
.. note:: If the snapshot to join/recover from is recent, it is likely that the evidence for that snapshot is included in the latest `uncommitted` ledger file. In this case, the corresponding ledger file(s) should be copied to the node's main ledger directory (as specified by ``--ledger-dir``) before start-up.

Historical Transactions
~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/aft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -2026,7 +2026,7 @@ namespace aft
state->commit_idx = idx;

LOG_DEBUG_FMT("Compacting...");
snapshotter->compact(idx);
snapshotter->commit(idx);
if (replica_state == Leader)
{
snapshotter->snapshot(idx);
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/aft/test/logging_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ namespace aft
return false;
}

void compact(Index)
void commit(Index)
{
// For now, do not test snapshots in unit tests
return;
Expand Down
9 changes: 6 additions & 3 deletions src/consensus/ledger_enclave_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,11 @@ DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::ledger_truncate, consensus::Index);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(consensus::ledger_commit, consensus::Index);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::snapshot, consensus::Index, std::vector<uint8_t>);
consensus::snapshot,
consensus::Index /* snapshot idx */,
consensus::Index /* evidence idx */,
std::vector<uint8_t>);
DECLARE_RINGBUFFER_MESSAGE_PAYLOAD(
consensus::snapshot_commit,
consensus::Index /* snapshot idx*/,
consensus::Index /* evidence idx */);
consensus::Index /* snapshot idx */,
consensus::Index /* evidence commit idx */);
9 changes: 7 additions & 2 deletions src/host/ledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -734,11 +734,16 @@ namespace asynchost

Ledger(const Ledger& that) = delete;

void init_idx(size_t idx)
void set_last_idx(size_t idx)
{
last_idx = idx;
}

size_t get_last_idx() const
{
return last_idx;
}

std::optional<std::vector<uint8_t>> read_entry(size_t idx)
{
auto f = get_file_from_idx(idx);
Expand Down Expand Up @@ -891,7 +896,7 @@ namespace asynchost
DISPATCHER_SET_MESSAGE_HANDLER(
disp, consensus::ledger_init, [this](const uint8_t* data, size_t size) {
auto idx = serialized::read<consensus::Index>(data, size);
init_idx(idx);
set_last_idx(idx);
});

DISPATCHER_SET_MESSAGE_HANDLER(
Expand Down
2 changes: 1 addition & 1 deletion src/host/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ int main(int argc, char** argv)
read_only_ledger_dirs);
ledger.register_message_handlers(bp.get_dispatcher());

asynchost::SnapshotManager snapshots(snapshot_dir);
asynchost::SnapshotManager snapshots(snapshot_dir, ledger);
snapshots.register_message_handlers(bp.get_dispatcher());

// Begin listening for node-to-node and RPC messages.
Expand Down
177 changes: 131 additions & 46 deletions src/host/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#pragma once

#include "consensus/ledger_enclave_types.h"
#include "host/ledger.h"

#include <charconv>
#include <filesystem>
#include <fstream>
#include <iostream>
Expand All @@ -17,25 +19,15 @@ namespace asynchost
{
private:
const std::string snapshot_dir;
const Ledger& ledger;

static constexpr auto snapshot_file_prefix = "snapshot";
static constexpr auto snapshot_idx_delimiter = "_";
static constexpr auto snapshot_committed_suffix = "committed";

bool is_committed_snapshot_file(const std::string& file_name)
{
// Snapshot file should start with known prefix and end with committed
// suffix
auto pos = file_name.find(snapshot_file_prefix);
if (pos == std::string::npos || pos != 0)
{
return false;
}
return (
file_name.find(snapshot_committed_suffix, pos) != std::string::npos);
}

size_t get_snapshot_idx_from_file_name(const std::string& file_name)
{
// Assumes snapshot file is not committed
auto pos = file_name.find(snapshot_idx_delimiter);
if (pos == std::string::npos)
{
Expand All @@ -46,11 +38,97 @@ namespace asynchost
return std::stol(file_name.substr(pos + 1));
}

std::optional<std::pair<size_t, size_t>>
get_snapshot_evidence_idx_from_file_name(const std::string& file_name)
{
// Returns snapshot evidence and evidence commit proof indices
auto commit_pos =
file_name.find(fmt::format(".{}", snapshot_committed_suffix));
if (commit_pos == std::string::npos)
{
// Snapshot is not yet committed
return std::nullopt;
}

auto idx_pos = file_name.find_first_of(snapshot_idx_delimiter);
if (idx_pos == std::string::npos)
{
// Snapshot has no idx
return std::nullopt;
}

auto evidence_pos =
file_name.find_first_of(snapshot_idx_delimiter, idx_pos + 1);
if (evidence_pos == std::string::npos)
{
// Snapshot has no evidence idx
return std::nullopt;
}

auto evidence_proof_pos = file_name.find_last_of(snapshot_idx_delimiter);
if (evidence_proof_pos == std::string::npos)
{
// Snapshot has no evidence proof idx
return std::nullopt;
}

size_t evidence_idx;
auto str_evidence_idx = file_name.substr(evidence_pos + 1, commit_pos);
if (
std::from_chars(
str_evidence_idx.data(),
str_evidence_idx.data() + str_evidence_idx.size(),
evidence_idx)
.ec != std::errc())
{
return std::nullopt;
}

size_t evidence_commit_idx;
std::string_view str_evidence_commit_idx =
file_name.substr(evidence_proof_pos + 1);
if (
std::from_chars(
str_evidence_commit_idx.data(),
str_evidence_commit_idx.data() + str_evidence_commit_idx.size(),
evidence_commit_idx)
.ec != std::errc())
{
return std::nullopt;
}
return std::make_pair(evidence_idx, evidence_commit_idx);
}

public:
SnapshotManager(const std::string& snapshot_dir_, const Ledger& ledger_) :
snapshot_dir(snapshot_dir_),
ledger(ledger_)
{
if (fs::is_directory(snapshot_dir))
{
LOG_INFO_FMT(
"Snapshots will be stored in existing directory: {}", snapshot_dir);
}
else if (!fs::create_directory(snapshot_dir))
{
throw std::logic_error(fmt::format(
"Error: Could not create snapshot directory: {}", snapshot_dir));
}
}

void write_snapshot(
consensus::Index idx, const uint8_t* snapshot_data, size_t snapshot_size)
consensus::Index idx,
consensus::Index evidence_idx,
const uint8_t* snapshot_data,
size_t snapshot_size)
{
auto snapshot_file_name = fmt::format(
"{}{}{}", snapshot_file_prefix, snapshot_idx_delimiter, idx);
"{}{}{}{}{}",
snapshot_file_prefix,
snapshot_idx_delimiter,
idx,
snapshot_idx_delimiter,
evidence_idx);
auto full_snapshot_path =
fs::path(snapshot_dir) / fs::path(snapshot_file_name);

Expand All @@ -72,28 +150,29 @@ namespace asynchost
}

void commit_snapshot(
consensus::Index snapshot_idx, consensus::Index evidence_idx)
consensus::Index snapshot_idx, consensus::Index evidence_commit_idx)
{
// Find previously-generated snapshot for snapshot_idx and rename file,
// including evidence_idx in name too
// including evidence_commit_idx in name too
for (auto const& f : fs::directory_iterator(snapshot_dir))
{
auto file_name = f.path().filename().string();
if (
!is_committed_snapshot_file(file_name) &&
!get_snapshot_evidence_idx_from_file_name(file_name).has_value() &&
get_snapshot_idx_from_file_name(file_name) == snapshot_idx)
{
LOG_INFO_FMT(
"Committing snapshot file \"{}\" with evidence at {}",
"Committing snapshot file \"{}\" with evidence proof committed at "
"{}",
file_name,
evidence_idx);
evidence_commit_idx);

const auto committed_file_name = fmt::format(
"{}.{}{}{}",
file_name,
snapshot_committed_suffix,
snapshot_idx_delimiter,
evidence_idx);
evidence_commit_idx);

fs::rename(
fs::path(snapshot_dir) / fs::path(file_name),
Expand All @@ -106,41 +185,45 @@ namespace asynchost
LOG_FAIL_FMT("Could not find snapshot to commit at {}", snapshot_idx);
}

public:
SnapshotManager(const std::string& snapshot_dir_) :
snapshot_dir(snapshot_dir_)
{
if (fs::is_directory(snapshot_dir))
{
LOG_INFO_FMT(
"Snapshots will be stored in existing directory: {}", snapshot_dir);
}
else if (!fs::create_directory(snapshot_dir))
{
throw std::logic_error(fmt::format(
"Error: Could not create snapshot directory: {}", snapshot_dir));
}
}

std::optional<std::string> find_latest_committed_snapshot()
{
std::optional<std::string> snapshot_file = std::nullopt;
size_t latest_idx = 0;

size_t ledger_last_idx = ledger.get_last_idx();

for (auto& f : fs::directory_iterator(snapshot_dir))
{
auto file_name = f.path().filename().string();
auto pos = file_name.find(
fmt::format("{}{}", snapshot_file_prefix, snapshot_idx_delimiter));
if (pos == std::string::npos || !is_committed_snapshot_file(file_name))
if (
file_name.find(fmt::format(
"{}{}", snapshot_file_prefix, snapshot_idx_delimiter)) ==
std::string::npos)
{
LOG_INFO_FMT("Ignoring non-snapshot file \"{}\"", file_name);
continue;
}

auto evidence_indices =
get_snapshot_evidence_idx_from_file_name(file_name);
if (!evidence_indices.has_value())
{
LOG_INFO_FMT("Ignoring uncommitted snapshot file \"{}\"", file_name);
continue;
}

if (evidence_indices->second > ledger.get_last_idx())
{
LOG_INFO_FMT(
"Ignoring \"{}\" because it is not a committed snapshot file",
file_name);
"Ignoring \"{}\" because ledger does not contain evidence commit "
"seqno: evidence commit seqno {} > last ledger seqno {}",
file_name,
evidence_indices->second,
ledger_last_idx);
continue;
}

pos = file_name.find(snapshot_idx_delimiter);
auto pos = file_name.find(snapshot_idx_delimiter);
size_t snapshot_idx = std::stol(file_name.substr(pos + 1));
if (snapshot_idx > latest_idx)
{
Expand All @@ -158,16 +241,18 @@ namespace asynchost
DISPATCHER_SET_MESSAGE_HANDLER(
disp, consensus::snapshot, [this](const uint8_t* data, size_t size) {
auto idx = serialized::read<consensus::Index>(data, size);
write_snapshot(idx, data, size);
auto evidence_idx = serialized::read<consensus::Index>(data, size);
write_snapshot(idx, evidence_idx, data, size);
});

DISPATCHER_SET_MESSAGE_HANDLER(
disp,
consensus::snapshot_commit,
[this](const uint8_t* data, size_t size) {
auto snapshot_idx = serialized::read<consensus::Index>(data, size);
auto evidence_idx = serialized::read<consensus::Index>(data, size);
commit_snapshot(snapshot_idx, evidence_idx);
auto evidence_commit_idx =
serialized::read<consensus::Index>(data, size);
commit_snapshot(snapshot_idx, evidence_commit_idx);
});
}
};
Expand Down
Loading

0 comments on commit 489e511

Please sign in to comment.