Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async snapshot generation and evidence #1510

Merged
merged 33 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7d8a619
Champ correct size
Jul 16, 2020
6dd9e0b
Snapshot from raft
Jul 17, 2020
2b283c2
Generate snapshots and store to disk
Jul 17, 2020
a3705e2
Snapshot protocol WIP
Jul 20, 2020
af0a2e2
Merge remote-tracking branch 'upstream/master' into generate_snapshot
Aug 13, 2020
0b020c9
ledger max chunk -> ledger min chunk
Aug 14, 2020
2158750
Snapshots are written to disk
Aug 14, 2020
12e0158
Snapshotter returns snapshot version to Raft
Aug 14, 2020
09428ee
Fix unit tests
Aug 14, 2020
0b965de
Merge remote-tracking branch 'upstream/master' into generate_snapshot
Aug 14, 2020
bcc8b31
Format
Aug 14, 2020
9a8aa43
black
Aug 14, 2020
f4683e0
SnaPshotter
Aug 18, 2020
432675c
Unsigned idx
Aug 18, 2020
e2d519c
Merge branch 'master' into generate_snapshot
jumaffre Aug 18, 2020
33b4cb1
snapshot_min_tx -> snapshot_max_tx
Aug 18, 2020
40d0712
Merge branch 'generate_snapshot' of github.com:jumaffre/CCF into gene…
Aug 18, 2020
8920fe2
Remove type of Tmsg when adding task
Aug 18, 2020
07e3b76
And the other half...
Aug 18, 2020
a6acae5
Merge branch 'generate_snapshot' into async_snapshot_generation
Aug 18, 2020
7e24cc2
Snapshot generation is async
Aug 18, 2020
143f3a8
Commit snapshot evidence
Aug 19, 2020
ec87487
Merge remote-tracking branch 'upstream/master' into async_snapshot_ge…
Aug 19, 2020
a07e6b0
Add snapshot idx to evidence table
Aug 19, 2020
e33694c
Split snapshot generation and serialisation
Aug 19, 2020
426b580
Format
Aug 19, 2020
e83b546
Merge branch 'master' into async_snapshot_generation
jumaffre Aug 19, 2020
e2332e3
Actually remove last_snapshot_idx from Raft
Aug 19, 2020
ea7dcef
snapsot evidence singular
Aug 19, 2020
bc9b328
Merge branch 'async_snapshot_generation' of github.com:jumaffre/CCF i…
Aug 19, 2020
b6ab777
Merge branch 'master' into async_snapshot_generation
jumaffre Aug 19, 2020
21908b2
Format
Aug 19, 2020
8d743a1
Merge branch 'async_snapshot_generation' of github.com:jumaffre/CCF i…
Aug 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/consensus/pbft/libbyz/client_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ bool ClientProxy<T, C>::send_request(

if (threading::ThreadMessaging::thread_count > 1)
{
threading::ThreadMessaging::thread_messaging.add_task<ExecuteRequestMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
jumaffre marked this conversation as resolved.
Show resolved Hide resolved
threading::ThreadMessaging::main_thread, std::move(msg));
}
else
Expand Down Expand Up @@ -427,7 +427,7 @@ void ClientProxy<T, C>::recv_reply(Reply* reply)

if (threading::ThreadMessaging::thread_count > 1)
{
threading::ThreadMessaging::thread_messaging.add_task<ReplyCbMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
ctx->reply_thread, std::move(msg));
}
else
Expand Down
4 changes: 2 additions & 2 deletions src/consensus/pbft/libbyz/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ static void pre_verify_cb(std::unique_ptr<threading::Tmsg<PreVerifyCbMsg>> req)
resp->data.self = self;
resp->data.result = self->pre_verify(m);

threading::ThreadMessaging::thread_messaging.add_task<PreVerifyResultCbMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
threading::ThreadMessaging::main_thread, std::move(resp));
}

Expand Down Expand Up @@ -353,7 +353,7 @@ void Replica::receive_message(const uint8_t* data, uint32_t size)
msg->data.m = m;
msg->data.self = this;

threading::ThreadMessaging::thread_messaging.add_task<PreVerifyCbMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
target_thread, std::move(msg));
}
else
Expand Down
20 changes: 9 additions & 11 deletions src/consensus/pbft/pbft.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ namespace pbft
if (threading::ThreadMessaging::thread_count > 1)
{
uint16_t tid = threading::ThreadMessaging::get_execution_thread(to);
threading::ThreadMessaging::thread_messaging
.add_task<SendAuthenticatedAEMsg>(tid, std::move(tmsg));
threading::ThreadMessaging::thread_messaging.add_task(
tid, std::move(tmsg));
}
else
{
Expand Down Expand Up @@ -297,8 +297,8 @@ namespace pbft
{
uint16_t tid = threading::ThreadMessaging::get_execution_thread(
++execution_thread_counter);
threading::ThreadMessaging::thread_messaging
.add_task<SendAuthenticatedMsg>(tid, std::move(tmsg));
threading::ThreadMessaging::thread_messaging.add_task(
tid, std::move(tmsg));
}
else
{
Expand Down Expand Up @@ -702,9 +702,8 @@ namespace pbft
msg, &recv_authenticated_msg_process_cb);
if (threading::ThreadMessaging::thread_count > 1)
{
threading::ThreadMessaging::thread_messaging
.add_task<RecvAuthenticatedMsg>(
threading::ThreadMessaging::main_thread, std::move(msg));
threading::ThreadMessaging::thread_messaging.add_task(
threading::ThreadMessaging::main_thread, std::move(msg));
}
else
{
Expand Down Expand Up @@ -749,10 +748,9 @@ namespace pbft

if (threading::ThreadMessaging::thread_count > 1)
{
threading::ThreadMessaging::thread_messaging
.add_task<RecvAuthenticatedMsg>(
recv_nonce.tid % threading::ThreadMessaging::thread_count,
std::move(tmsg));
threading::ThreadMessaging::thread_messaging.add_task(
recv_nonce.tid % threading::ThreadMessaging::thread_count,
std::move(tmsg));
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions src/consensus/pbft/pbft_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ namespace pbft
{
threading::ThreadMessaging::thread_messaging
.ChangeTmsgCallback<ExecutionCtx>(c, &ExecuteCb);
threading::ThreadMessaging::thread_messaging.add_task<ExecutionCtx>(
threading::ThreadMessaging::thread_messaging.add_task(
threading::ThreadMessaging::main_thread, std::move(c));
}
else
Expand Down Expand Up @@ -282,7 +282,7 @@ namespace pbft
{
tid = (threading::ThreadMessaging::thread_count - 1);
}
threading::ThreadMessaging::thread_messaging.add_task<ExecutionCtx>(
threading::ThreadMessaging::thread_messaging.add_task(
tid, std::move(execution_ctx));
}
else
Expand Down
9 changes: 1 addition & 8 deletions src/consensus/raft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ namespace raft
Index commit_idx;
TermHistory term_history;

Index last_snapshot_idx;

// Volatile
NodeId leader_id;
std::unordered_set<NodeId> votes_for_me;
Expand Down Expand Up @@ -173,7 +171,6 @@ namespace raft
voted_for(NoNode),
last_idx(0),
commit_idx(0),
last_snapshot_idx(0),

leader_id(NoNode),

Expand Down Expand Up @@ -1170,11 +1167,7 @@ namespace raft
LOG_DEBUG_FMT("Compacting...");
if (state == Leader)
{
auto snapshot_idx = snapshotter->snapshot(idx);
if (snapshot_idx.has_value())
{
last_snapshot_idx = snapshot_idx.value();
}
snapshotter->snapshot(idx);
jumaffre marked this conversation as resolved.
Show resolved Hide resolved
}
store->compact(idx);
ledger->commit(idx);
Expand Down
4 changes: 2 additions & 2 deletions src/consensus/raft/test/logging_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ namespace raft
class StubSnapshotter
{
public:
std::optional<kv::Version> snapshot(kv::Version version)
void snapshot(Index)
{
// For now, do not test snapshots in unit tests
return std::nullopt;
return;
}
};
}
4 changes: 3 additions & 1 deletion src/ds/thread_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@

namespace threading
{
static constexpr size_t MAIN_THREAD_ID = 0;

extern std::map<std::thread::id, uint16_t> thread_ids;

static inline uint16_t get_current_thread_id()
{
if (thread_ids.empty())
{
return 0;
return MAIN_THREAD_ID;
}

const auto tid = std::this_thread::get_id();
Expand Down
4 changes: 2 additions & 2 deletions src/ds/thread_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ namespace threading
public:
static ThreadMessaging thread_messaging;
static std::atomic<uint16_t> thread_count;
static const uint16_t main_thread = 0;
static const uint16_t main_thread = MAIN_THREAD_ID;

static const uint16_t max_num_threads = 24;

Expand Down Expand Up @@ -238,7 +238,7 @@ namespace threading

static uint16_t get_execution_thread(uint32_t i)
{
uint16_t tid = 0;
uint16_t tid = MAIN_THREAD_ID;
if (thread_count > 1)
{
tid = (i % (thread_count - 1));
Expand Down
2 changes: 1 addition & 1 deletion src/enclave/enclave.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ namespace enclave
{
auto msg = std::make_unique<threading::Tmsg<Msg>>(&init_thread_cb);
msg->data.tid = threading::get_current_thread_id();
threading::ThreadMessaging::thread_messaging.add_task<Msg>(
threading::ThreadMessaging::thread_messaging.add_task(
msg->data.tid, std::move(msg));

threading::ThreadMessaging::thread_messaging.run();
Expand Down
2 changes: 1 addition & 1 deletion src/enclave/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ extern "C"

LOG_INFO_FMT("All threads are ready!");

if (tid == 0)
if (tid == threading::MAIN_THREAD_ID)
{
auto s = e.load()->run_main();
while (num_complete_threads !=
Expand Down
6 changes: 3 additions & 3 deletions src/enclave/tls_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ namespace enclave
}
else
{
execution_thread = 0;
execution_thread = threading::MAIN_THREAD_ID;
}
ctx->set_bio(this, send_callback, recv_callback, dbg_callback);
}
Expand Down Expand Up @@ -235,7 +235,7 @@ namespace enclave
msg->data.self = this->shared_from_this();
msg->data.data = data;

threading::ThreadMessaging::thread_messaging.add_task<SendRecvMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
execution_thread, std::move(msg));
}

Expand Down Expand Up @@ -323,7 +323,7 @@ namespace enclave
auto msg = std::make_unique<threading::Tmsg<EmptyMsg>>(&close_cb);
msg->data.self = this->shared_from_this();

threading::ThreadMessaging::thread_messaging.add_task<EmptyMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
execution_thread, std::move(msg));
}

Expand Down
2 changes: 1 addition & 1 deletion src/http/http_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace http
msg->data.self = this->shared_from_this();
msg->data.data.assign(data, data + size);

threading::ThreadMessaging::thread_messaging.add_task<SendRecvMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
execution_thread, std::move(msg));
}

Expand Down
10 changes: 6 additions & 4 deletions src/kv/kv_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ namespace kv
{
public:
virtual ~AbstractSnapshot() = default;
virtual void add_map_snapshot(
std::unique_ptr<kv::AbstractMap::Snapshot> snapshot) = 0;
virtual std::vector<uint8_t> serialise(KvStoreSerialiser& s) = 0;
virtual Version get_version() const = 0;
virtual std::vector<uint8_t> serialise(
std::shared_ptr<AbstractTxEncryptor> encryptor) = 0;
};

virtual ~AbstractStore() {}
Expand All @@ -436,7 +436,9 @@ namespace kv
virtual CommitSuccess commit(
const TxID& txid, PendingTx&& pending_tx, bool globally_committable) = 0;

virtual std::vector<uint8_t> serialise_snapshot(Version v) = 0;
virtual std::unique_ptr<AbstractSnapshot> snapshot(Version v) = 0;
virtual std::vector<uint8_t> serialise_snapshot(
std::unique_ptr<AbstractSnapshot> snapshot) = 0;
virtual DeserialiseSuccess deserialise_snapshot(
const std::vector<uint8_t>& data) = 0;

Expand Down
23 changes: 16 additions & 7 deletions src/kv/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ namespace kv
class StoreSnapshot : public AbstractStore::AbstractSnapshot
{
private:
Version version;

std::vector<std::unique_ptr<kv::AbstractMap::Snapshot>> snapshots;
std::optional<std::vector<uint8_t>> hash_at_snapshot = std::nullopt;

public:
StoreSnapshot() = default;
StoreSnapshot(Version version_) : version(version_) {}

void add_map_snapshot(
std::unique_ptr<kv::AbstractMap::Snapshot> snapshot) override
void add_map_snapshot(std::unique_ptr<kv::AbstractMap::Snapshot> snapshot)
{
snapshots.push_back(std::move(snapshot));
}
Expand All @@ -26,11 +27,19 @@ namespace kv
hash_at_snapshot = std::move(hash_at_snapshot_);
}

std::vector<uint8_t> serialise(KvStoreSerialiser& s) override
Version get_version() const
{
return version;
}

std::vector<uint8_t> serialise(
std::shared_ptr<AbstractTxEncryptor> encryptor)
{
KvStoreSerialiser serialiser(encryptor, version, true);

if (hash_at_snapshot.has_value())
{
s.serialise_raw(hash_at_snapshot.value());
serialiser.serialise_raw(hash_at_snapshot.value());
}

for (auto domain : {SecurityDomain::PUBLIC, SecurityDomain::PRIVATE})
Expand All @@ -39,12 +48,12 @@ namespace kv
{
if (it->get_security_domain() == domain)
{
it->serialise(s);
it->serialise(serialiser);
}
}
}

return s.get_raw_data();
return serialiser.get_raw_data();
}
};
}
18 changes: 11 additions & 7 deletions src/kv/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ namespace kv
return *result;
}

std::vector<uint8_t> serialise_snapshot(Version v) override
std::unique_ptr<AbstractSnapshot> snapshot(Version v) override
{
CCF_ASSERT_FMT(
v >= commit_version(),
Expand All @@ -236,7 +236,7 @@ namespace kv
v,
current_version());

StoreSnapshot snapshot;
auto snapshot = std::make_unique<StoreSnapshot>(v);

{
std::lock_guard<SpinLock> mguard(maps_lock);
Expand All @@ -248,13 +248,13 @@ namespace kv

for (auto& map : maps)
{
snapshot.add_map_snapshot(map.second->snapshot(v));
snapshot->add_map_snapshot(map.second->snapshot(v));
}

auto h = get_history();
if (h)
{
snapshot.add_hash_at_snapshot(h->get_raw_leaf(v));
snapshot->add_hash_at_snapshot(h->get_raw_leaf(v));
}

for (auto& map : maps)
Expand All @@ -263,10 +263,14 @@ namespace kv
}
}

auto e = get_encryptor();
KvStoreSerialiser serialiser(e, v, true);
return snapshot;
}

return snapshot.serialise(serialiser);
std::vector<uint8_t> serialise_snapshot(
std::unique_ptr<AbstractSnapshot> snapshot) override
{
auto e = get_encryptor();
achamayou marked this conversation as resolved.
Show resolved Hide resolved
return snapshot->serialise(e);
}

DeserialiseSuccess deserialise_snapshot(
Expand Down
8 changes: 5 additions & 3 deletions src/kv/test/kv_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ static void ser_snap(picobench::state& s)
throw std::logic_error("Transaction commit failed: " + std::to_string(rc));

s.start_timer();
kv_store.serialise_snapshot(tx.commit_version());
auto snap = kv_store.snapshot(tx.commit_version());
kv_store.serialise_snapshot(std::move(snap));
s.stop_timer();
}

Expand Down Expand Up @@ -234,10 +235,11 @@ static void des_snap(picobench::state& s)
if (rc != kv::CommitSuccess::OK)
throw std::logic_error("Transaction commit failed: " + std::to_string(rc));

auto snapshot = kv_store.serialise_snapshot(tx.commit_version());
auto snap = kv_store.snapshot(tx.commit_version());
auto serialised_snap = kv_store.serialise_snapshot(std::move(snap));

s.start_timer();
kv_store2.deserialise_snapshot(snapshot);
kv_store2.deserialise_snapshot(serialised_snap);
s.stop_timer();
}

Expand Down
Loading