Skip to content

Commit

Permalink
Async snapshot generation and evidence (#1510)
Browse files Browse the repository at this point in the history
  • Loading branch information
jumaffre authored Aug 19, 2020
1 parent ea6c11b commit ec9c3bd
Show file tree
Hide file tree
Showing 23 changed files with 206 additions and 90 deletions.
4 changes: 2 additions & 2 deletions src/consensus/pbft/libbyz/client_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ bool ClientProxy<T, C>::send_request(

if (threading::ThreadMessaging::thread_count > 1)
{
threading::ThreadMessaging::thread_messaging.add_task<ExecuteRequestMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
threading::ThreadMessaging::main_thread, std::move(msg));
}
else
Expand Down Expand Up @@ -427,7 +427,7 @@ void ClientProxy<T, C>::recv_reply(Reply* reply)

if (threading::ThreadMessaging::thread_count > 1)
{
threading::ThreadMessaging::thread_messaging.add_task<ReplyCbMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
ctx->reply_thread, std::move(msg));
}
else
Expand Down
4 changes: 2 additions & 2 deletions src/consensus/pbft/libbyz/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ static void pre_verify_cb(std::unique_ptr<threading::Tmsg<PreVerifyCbMsg>> req)
resp->data.self = self;
resp->data.result = self->pre_verify(m);

threading::ThreadMessaging::thread_messaging.add_task<PreVerifyResultCbMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
threading::ThreadMessaging::main_thread, std::move(resp));
}

Expand Down Expand Up @@ -353,7 +353,7 @@ void Replica::receive_message(const uint8_t* data, uint32_t size)
msg->data.m = m;
msg->data.self = this;

threading::ThreadMessaging::thread_messaging.add_task<PreVerifyCbMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
target_thread, std::move(msg));
}
else
Expand Down
20 changes: 9 additions & 11 deletions src/consensus/pbft/pbft.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ namespace pbft
if (threading::ThreadMessaging::thread_count > 1)
{
uint16_t tid = threading::ThreadMessaging::get_execution_thread(to);
threading::ThreadMessaging::thread_messaging
.add_task<SendAuthenticatedAEMsg>(tid, std::move(tmsg));
threading::ThreadMessaging::thread_messaging.add_task(
tid, std::move(tmsg));
}
else
{
Expand Down Expand Up @@ -297,8 +297,8 @@ namespace pbft
{
uint16_t tid = threading::ThreadMessaging::get_execution_thread(
++execution_thread_counter);
threading::ThreadMessaging::thread_messaging
.add_task<SendAuthenticatedMsg>(tid, std::move(tmsg));
threading::ThreadMessaging::thread_messaging.add_task(
tid, std::move(tmsg));
}
else
{
Expand Down Expand Up @@ -702,9 +702,8 @@ namespace pbft
msg, &recv_authenticated_msg_process_cb);
if (threading::ThreadMessaging::thread_count > 1)
{
threading::ThreadMessaging::thread_messaging
.add_task<RecvAuthenticatedMsg>(
threading::ThreadMessaging::main_thread, std::move(msg));
threading::ThreadMessaging::thread_messaging.add_task(
threading::ThreadMessaging::main_thread, std::move(msg));
}
else
{
Expand Down Expand Up @@ -749,10 +748,9 @@ namespace pbft

if (threading::ThreadMessaging::thread_count > 1)
{
threading::ThreadMessaging::thread_messaging
.add_task<RecvAuthenticatedMsg>(
recv_nonce.tid % threading::ThreadMessaging::thread_count,
std::move(tmsg));
threading::ThreadMessaging::thread_messaging.add_task(
recv_nonce.tid % threading::ThreadMessaging::thread_count,
std::move(tmsg));
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions src/consensus/pbft/pbft_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ namespace pbft
{
threading::ThreadMessaging::thread_messaging
.ChangeTmsgCallback<ExecutionCtx>(c, &ExecuteCb);
threading::ThreadMessaging::thread_messaging.add_task<ExecutionCtx>(
threading::ThreadMessaging::thread_messaging.add_task(
threading::ThreadMessaging::main_thread, std::move(c));
}
else
Expand Down Expand Up @@ -282,7 +282,7 @@ namespace pbft
{
tid = (threading::ThreadMessaging::thread_count - 1);
}
threading::ThreadMessaging::thread_messaging.add_task<ExecutionCtx>(
threading::ThreadMessaging::thread_messaging.add_task(
tid, std::move(execution_ctx));
}
else
Expand Down
9 changes: 1 addition & 8 deletions src/consensus/raft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ namespace raft
Index commit_idx;
TermHistory term_history;

Index last_snapshot_idx;

// Volatile
NodeId leader_id;
std::unordered_set<NodeId> votes_for_me;
Expand Down Expand Up @@ -173,7 +171,6 @@ namespace raft
voted_for(NoNode),
last_idx(0),
commit_idx(0),
last_snapshot_idx(0),

leader_id(NoNode),

Expand Down Expand Up @@ -1170,11 +1167,7 @@ namespace raft
LOG_DEBUG_FMT("Compacting...");
if (state == Leader)
{
auto snapshot_idx = snapshotter->snapshot(idx);
if (snapshot_idx.has_value())
{
last_snapshot_idx = snapshot_idx.value();
}
snapshotter->snapshot(idx);
}
store->compact(idx);
ledger->commit(idx);
Expand Down
4 changes: 2 additions & 2 deletions src/consensus/raft/test/logging_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ namespace raft
class StubSnapshotter
{
public:
std::optional<kv::Version> snapshot(kv::Version version)
void snapshot(Index)
{
// For now, do not test snapshots in unit tests
return std::nullopt;
return;
}
};
}
4 changes: 3 additions & 1 deletion src/ds/thread_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@

namespace threading
{
static constexpr size_t MAIN_THREAD_ID = 0;

extern std::map<std::thread::id, uint16_t> thread_ids;

static inline uint16_t get_current_thread_id()
{
if (thread_ids.empty())
{
return 0;
return MAIN_THREAD_ID;
}

const auto tid = std::this_thread::get_id();
Expand Down
4 changes: 2 additions & 2 deletions src/ds/thread_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ namespace threading
public:
static ThreadMessaging thread_messaging;
static std::atomic<uint16_t> thread_count;
static const uint16_t main_thread = 0;
static const uint16_t main_thread = MAIN_THREAD_ID;

static const uint16_t max_num_threads = 24;

Expand Down Expand Up @@ -238,7 +238,7 @@ namespace threading

static uint16_t get_execution_thread(uint32_t i)
{
uint16_t tid = 0;
uint16_t tid = MAIN_THREAD_ID;
if (thread_count > 1)
{
tid = (i % (thread_count - 1));
Expand Down
2 changes: 1 addition & 1 deletion src/enclave/enclave.h
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ namespace enclave
{
auto msg = std::make_unique<threading::Tmsg<Msg>>(&init_thread_cb);
msg->data.tid = threading::get_current_thread_id();
threading::ThreadMessaging::thread_messaging.add_task<Msg>(
threading::ThreadMessaging::thread_messaging.add_task(
msg->data.tid, std::move(msg));

threading::ThreadMessaging::thread_messaging.run();
Expand Down
2 changes: 1 addition & 1 deletion src/enclave/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ extern "C"

LOG_INFO_FMT("All threads are ready!");

if (tid == 0)
if (tid == threading::MAIN_THREAD_ID)
{
auto s = e.load()->run_main();
while (num_complete_threads !=
Expand Down
6 changes: 3 additions & 3 deletions src/enclave/tls_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ namespace enclave
}
else
{
execution_thread = 0;
execution_thread = threading::MAIN_THREAD_ID;
}
ctx->set_bio(this, send_callback, recv_callback, dbg_callback);
}
Expand Down Expand Up @@ -235,7 +235,7 @@ namespace enclave
msg->data.self = this->shared_from_this();
msg->data.data = data;

threading::ThreadMessaging::thread_messaging.add_task<SendRecvMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
execution_thread, std::move(msg));
}

Expand Down Expand Up @@ -323,7 +323,7 @@ namespace enclave
auto msg = std::make_unique<threading::Tmsg<EmptyMsg>>(&close_cb);
msg->data.self = this->shared_from_this();

threading::ThreadMessaging::thread_messaging.add_task<EmptyMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
execution_thread, std::move(msg));
}

Expand Down
2 changes: 1 addition & 1 deletion src/http/http_endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace http
msg->data.self = this->shared_from_this();
msg->data.data.assign(data, data + size);

threading::ThreadMessaging::thread_messaging.add_task<SendRecvMsg>(
threading::ThreadMessaging::thread_messaging.add_task(
execution_thread, std::move(msg));
}

Expand Down
10 changes: 6 additions & 4 deletions src/kv/kv_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ namespace kv
{
public:
virtual ~AbstractSnapshot() = default;
virtual void add_map_snapshot(
std::unique_ptr<kv::AbstractMap::Snapshot> snapshot) = 0;
virtual std::vector<uint8_t> serialise(KvStoreSerialiser& s) = 0;
virtual Version get_version() const = 0;
virtual std::vector<uint8_t> serialise(
std::shared_ptr<AbstractTxEncryptor> encryptor) = 0;
};

virtual ~AbstractStore() {}
Expand All @@ -436,7 +436,9 @@ namespace kv
virtual CommitSuccess commit(
const TxID& txid, PendingTx&& pending_tx, bool globally_committable) = 0;

virtual std::vector<uint8_t> serialise_snapshot(Version v) = 0;
virtual std::unique_ptr<AbstractSnapshot> snapshot(Version v) = 0;
virtual std::vector<uint8_t> serialise_snapshot(
std::unique_ptr<AbstractSnapshot> snapshot) = 0;
virtual DeserialiseSuccess deserialise_snapshot(
const std::vector<uint8_t>& data) = 0;

Expand Down
23 changes: 16 additions & 7 deletions src/kv/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ namespace kv
class StoreSnapshot : public AbstractStore::AbstractSnapshot
{
private:
Version version;

std::vector<std::unique_ptr<kv::AbstractMap::Snapshot>> snapshots;
std::optional<std::vector<uint8_t>> hash_at_snapshot = std::nullopt;

public:
StoreSnapshot() = default;
StoreSnapshot(Version version_) : version(version_) {}

void add_map_snapshot(
std::unique_ptr<kv::AbstractMap::Snapshot> snapshot) override
void add_map_snapshot(std::unique_ptr<kv::AbstractMap::Snapshot> snapshot)
{
snapshots.push_back(std::move(snapshot));
}
Expand All @@ -26,11 +27,19 @@ namespace kv
hash_at_snapshot = std::move(hash_at_snapshot_);
}

std::vector<uint8_t> serialise(KvStoreSerialiser& s) override
Version get_version() const
{
return version;
}

std::vector<uint8_t> serialise(
std::shared_ptr<AbstractTxEncryptor> encryptor)
{
KvStoreSerialiser serialiser(encryptor, version, true);

if (hash_at_snapshot.has_value())
{
s.serialise_raw(hash_at_snapshot.value());
serialiser.serialise_raw(hash_at_snapshot.value());
}

for (auto domain : {SecurityDomain::PUBLIC, SecurityDomain::PRIVATE})
Expand All @@ -39,12 +48,12 @@ namespace kv
{
if (it->get_security_domain() == domain)
{
it->serialise(s);
it->serialise(serialiser);
}
}
}

return s.get_raw_data();
return serialiser.get_raw_data();
}
};
}
18 changes: 11 additions & 7 deletions src/kv/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ namespace kv
return *result;
}

std::vector<uint8_t> serialise_snapshot(Version v) override
std::unique_ptr<AbstractSnapshot> snapshot(Version v) override
{
CCF_ASSERT_FMT(
v >= commit_version(),
Expand All @@ -236,7 +236,7 @@ namespace kv
v,
current_version());

StoreSnapshot snapshot;
auto snapshot = std::make_unique<StoreSnapshot>(v);

{
std::lock_guard<SpinLock> mguard(maps_lock);
Expand All @@ -248,13 +248,13 @@ namespace kv

for (auto& map : maps)
{
snapshot.add_map_snapshot(map.second->snapshot(v));
snapshot->add_map_snapshot(map.second->snapshot(v));
}

auto h = get_history();
if (h)
{
snapshot.add_hash_at_snapshot(h->get_raw_leaf(v));
snapshot->add_hash_at_snapshot(h->get_raw_leaf(v));
}

for (auto& map : maps)
Expand All @@ -263,10 +263,14 @@ namespace kv
}
}

auto e = get_encryptor();
KvStoreSerialiser serialiser(e, v, true);
return snapshot;
}

return snapshot.serialise(serialiser);
std::vector<uint8_t> serialise_snapshot(
std::unique_ptr<AbstractSnapshot> snapshot) override
{
auto e = get_encryptor();
return snapshot->serialise(e);
}

DeserialiseSuccess deserialise_snapshot(
Expand Down
8 changes: 5 additions & 3 deletions src/kv/test/kv_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ static void ser_snap(picobench::state& s)
throw std::logic_error("Transaction commit failed: " + std::to_string(rc));

s.start_timer();
kv_store.serialise_snapshot(tx.commit_version());
auto snap = kv_store.snapshot(tx.commit_version());
kv_store.serialise_snapshot(std::move(snap));
s.stop_timer();
}

Expand Down Expand Up @@ -234,10 +235,11 @@ static void des_snap(picobench::state& s)
if (rc != kv::CommitSuccess::OK)
throw std::logic_error("Transaction commit failed: " + std::to_string(rc));

auto snapshot = kv_store.serialise_snapshot(tx.commit_version());
auto snap = kv_store.snapshot(tx.commit_version());
auto serialised_snap = kv_store.serialise_snapshot(std::move(snap));

s.start_timer();
kv_store2.deserialise_snapshot(snapshot);
kv_store2.deserialise_snapshot(serialised_snap);
s.stop_timer();
}

Expand Down
Loading

0 comments on commit ec9c3bd

Please sign in to comment.