Skip to content

Commit

Permalink
GH-3 Avoid vote_message copies by using shared_ptr
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Apr 12, 2024
1 parent d1fe636 commit f3ff3e8
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 78 deletions.
6 changes: 3 additions & 3 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3567,7 +3567,7 @@ struct controller_impl {


// called from net threads and controller's thread pool
void process_vote_message( uint32_t connection_id, const vote_message& vote ) {
void process_vote_message( uint32_t connection_id, const vote_message_ptr& vote ) {
if (conf.vote_thread_pool_size > 0) {
vote_processor.process_vote_message(connection_id, vote);
}
Expand Down Expand Up @@ -3597,7 +3597,7 @@ struct controller_impl {

// Each finalizer configured on the node which is present in the active finalizer policy may create and sign a vote.
my_finalizers.maybe_vote(
*bsp->active_finalizer_policy, bsp, bsp->strong_digest, [&](const vote_message& vote) {
*bsp->active_finalizer_policy, bsp, bsp->strong_digest, [&](const vote_message_ptr& vote) {
// net plugin subscribed to this signal. it will broadcast the vote message on receiving the signal
emit(voted_block, std::tuple{uint32_t{0}, vote_status::success, std::cref(vote)});

Expand Down Expand Up @@ -5259,7 +5259,7 @@ void controller::set_proposed_finalizers( finalizer_policy&& fin_pol ) {
}

// called from net threads
void controller::process_vote_message( uint32_t connection_id, const vote_message& vote ) {
void controller::process_vote_message( uint32_t connection_id, const vote_message_ptr& vote ) {
my->process_vote_message( connection_id, vote );
};

Expand Down
8 changes: 4 additions & 4 deletions libraries/chain/hotstuff/finalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ finalizer::vote_result finalizer::decide_vote(const block_state_ptr& bsp) {
}

// ----------------------------------------------------------------------------------------
std::optional<vote_message> finalizer::maybe_vote(const bls_public_key& pub_key,
const block_state_ptr& bsp,
const digest_type& digest) {
vote_message_ptr finalizer::maybe_vote(const bls_public_key& pub_key,
const block_state_ptr& bsp,
const digest_type& digest) {
finalizer::vote_decision decision = decide_vote(bsp).decision;
if (decision == vote_decision::strong_vote || decision == vote_decision::weak_vote) {
bls_signature sig;
Expand All @@ -99,7 +99,7 @@ std::optional<vote_message> finalizer::maybe_vote(const bls_public_key& pub_key,
} else {
sig = priv_key.sign({(uint8_t*)digest.data(), (uint8_t*)digest.data() + digest.data_size()});
}
return std::optional{vote_message{ bsp->id(), decision == vote_decision::strong_vote, pub_key, sig }};
return std::make_shared<vote_message>(bsp->id(), decision == vote_decision::strong_vote, pub_key, sig);
}
return {};
}
Expand Down
4 changes: 2 additions & 2 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace eosio::chain {
using trx_meta_cache_lookup = std::function<transaction_metadata_ptr( const transaction_id_type&)>;

using block_signal_params = std::tuple<const signed_block_ptr&, const block_id_type&>;
using vote_signal_params = std::tuple<uint32_t, vote_status, const vote_message&>;
using vote_signal_params = std::tuple<uint32_t, vote_status, const vote_message_ptr&>;

enum class db_read_mode {
HEAD,
Expand Down Expand Up @@ -328,7 +328,7 @@ namespace eosio::chain {
// called by host function set_finalizers
void set_proposed_finalizers( finalizer_policy&& fin_pol );
// called from net threads
void process_vote_message( uint32_t connection_id, const vote_message& msg );
void process_vote_message( uint32_t connection_id, const vote_message_ptr& msg );
// thread safe, for testing
bool node_has_voted_if_finalizer(const block_id_type& id) const;

Expand Down
9 changes: 4 additions & 5 deletions libraries/chain/include/eosio/chain/hotstuff/finalizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ namespace eosio::chain {
finalizer_safety_information fsi;

vote_result decide_vote(const block_state_ptr& bsp);
std::optional<vote_message> maybe_vote(const bls_public_key& pub_key, const block_state_ptr& bsp,
const digest_type& digest);
vote_message_ptr maybe_vote(const bls_public_key& pub_key, const block_state_ptr& bsp, const digest_type& digest);
};

// ----------------------------------------------------------------------------------------
Expand Down Expand Up @@ -95,7 +94,7 @@ namespace eosio::chain {
if (finalizers.empty())
return;

std::vector<vote_message> votes;
std::vector<vote_message_ptr> votes;
votes.reserve(finalizers.size());

// Possible improvement in the future, look at locking only individual finalizers and releasing the lock for writing the file.
Expand All @@ -105,9 +104,9 @@ namespace eosio::chain {
// first accumulate all the votes
for (const auto& f : fin_pol.finalizers) {
if (auto it = finalizers.find(f.public_key); it != finalizers.end()) {
std::optional<vote_message> vote_msg = it->second.maybe_vote(it->first, bsp, digest);
vote_message_ptr vote_msg = it->second.maybe_vote(it->first, bsp, digest);
if (vote_msg)
votes.push_back(std::move(*vote_msg));
votes.push_back(std::move(vote_msg));
}
}
// then save the safety info and, if successful, gossip the votes
Expand Down
2 changes: 2 additions & 0 deletions libraries/chain/include/eosio/chain/hotstuff/hotstuff.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ namespace eosio::chain {
bool operator==(const vote_message&) const = default;
};

using vote_message_ptr = std::shared_ptr<vote_message>;

enum class vote_status {
success,
duplicate, // duplicate vote, expected as votes arrive on multiple connections
Expand Down
49 changes: 23 additions & 26 deletions libraries/chain/include/eosio/chain/vote_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,24 @@ class vote_processor_t {
struct by_vote;

struct vote {
uint32_t connection_id;
vote_message msg;
uint32_t connection_id;
vote_message_ptr msg;

const block_id_type& id() const { return msg.block_id; }
block_num_type block_num() const { return block_header::num_from_id(msg.block_id); }
const block_id_type& id() const { return msg->block_id; }
block_num_type block_num() const { return block_header::num_from_id(msg->block_id); }
};

using vote_ptr = std::shared_ptr<vote>;
using vote_signal_type = decltype(controller({},chain_id_type::empty_chain_id()).voted_block());

using vote_index_type = boost::multi_index_container< vote_ptr,
using vote_index_type = boost::multi_index_container< vote,
indexed_by<
ordered_non_unique<tag<by_block_num>,
composite_key<vote,
const_mem_fun<vote, block_num_type, &vote::block_num>,
const_mem_fun<vote, const block_id_type&, &vote::id>
>, composite_key_compare< std::greater<>, sha256_less > // greater for block_num
>,
ordered_non_unique< tag<by_connection>, member<vote, uint32_t, &vote::connection_id> >,
ordered_unique< tag<by_vote>, member<vote, vote_message, &vote::msg> >
ordered_non_unique< tag<by_connection>, member<vote, uint32_t, &vote::connection_id> >
>
>;

Expand Down Expand Up @@ -84,7 +82,7 @@ class vote_processor_t {
}
}

void emit(uint32_t connection_id, vote_status status, const vote_message& msg) {
void emit(uint32_t connection_id, vote_status status, const vote_message_ptr& msg) {
if (connection_id != 0) { // this nodes vote was already signaled
emit( vote_signal, std::tuple{connection_id, status, std::cref(msg)} );
}
Expand All @@ -102,8 +100,8 @@ class vote_processor_t {
}

bool remove_all_for_block(auto& idx, auto& it, const block_id_type& id) {
while (it != idx.end() && (*it)->id() == id) {
if (auto& num = num_messages[(*it)->connection_id]; num != 0)
while (it != idx.end() && it->id() == id) {
if (auto& num = num_messages[it->connection_id]; num != 0)
--num;

it = idx.erase(it);
Expand All @@ -112,7 +110,7 @@ class vote_processor_t {
}

bool skip_all_for_block(auto& idx, auto& it, const block_id_type& id) {
while (it != idx.end() && (*it)->id() == id) {
while (it != idx.end() && it->id() == id) {
++it;
}
return it == idx.end();
Expand Down Expand Up @@ -157,35 +155,35 @@ class vote_processor_t {
continue;
}
auto& idx = index.get<by_block_num>();
if (auto i = idx.begin(); i != idx.end() && not_in_forkdb_id == (*i)->id()) { // same block as last while loop
if (auto i = idx.begin(); i != idx.end() && not_in_forkdb_id == i->id()) { // same block as last while loop
g.unlock();
std::this_thread::sleep_for(block_wait_time);
g.lock();
}
for (auto i = idx.begin(); i != idx.end();) {
auto& vt = *i;
block_state_ptr bsp = fetch_block_func(vt->id());
block_state_ptr bsp = fetch_block_func(vt.id());
if (bsp) {
if (!bsp->is_proper_svnn_block()) {
if (remove_all_for_block(idx, i, bsp->id()))
break;
continue;
}
auto iter_of_bsp = i;
std::vector<vote_ptr> to_process;
std::vector<vote> to_process;
to_process.reserve(std::min<size_t>(21u, idx.size())); // increase if we increase # of finalizers from 21
for(; i != idx.end() && bsp->id() == (*i)->id(); ++i) {
for(; i != idx.end() && bsp->id() == i->id(); ++i) {
// although it is the highest contention on block state pending mutex posting all of the same bsp,
// the highest priority is processing votes for this block state.
to_process.push_back(*i);
}
bool should_break = remove_all_for_block(idx, iter_of_bsp, bsp->id());
g.unlock(); // do not hold lock when posting
for (auto& vptr : to_process) {
boost::asio::post(thread_pool.get_executor(), [this, bsp, vptr=std::move(vptr)]() {
vote_status s = bsp->aggregate_vote(vptr->msg);
for (auto& v : to_process) {
boost::asio::post(thread_pool.get_executor(), [this, bsp, v=std::move(v)]() {
vote_status s = bsp->aggregate_vote(*v.msg);
if (s != vote_status::duplicate) { // don't bother emitting duplicates
emit(vptr->connection_id, s, vptr->msg);
emit(v.connection_id, s, v.msg);
}
});
}
Expand All @@ -194,8 +192,8 @@ class vote_processor_t {
g.lock();
i = idx.begin();
} else {
not_in_forkdb_id = vt->id();
if (skip_all_for_block(idx, i, (*i)->id()))
not_in_forkdb_id = vt.id();
if (skip_all_for_block(idx, i, i->id()))
break;
}
}
Expand All @@ -208,8 +206,7 @@ class vote_processor_t {
lib = block_num;
}

void process_vote_message(uint32_t connection_id, const vote_message& msg) {
vote_ptr vptr = std::make_shared<vote>(vote{.connection_id = connection_id, .msg = msg});
void process_vote_message(uint32_t connection_id, const vote_message_ptr& msg) {
boost::asio::post(thread_pool.get_executor(), [this, connection_id, msg] {
std::unique_lock g(mtx);
if (++num_messages[connection_id] > max_votes_per_connection) {
Expand All @@ -220,10 +217,10 @@ class vote_processor_t {

elog("Exceeded max votes per connection for ${c}", ("c", connection_id));
emit(connection_id, vote_status::max_exceeded, msg);
} else if (block_header::num_from_id(msg.block_id) < lib.load(std::memory_order_relaxed)) {
} else if (block_header::num_from_id(msg->block_id) < lib.load(std::memory_order_relaxed)) {
// ignore
} else {
index.insert(std::make_shared<vote>(vote{.connection_id = connection_id, .msg = msg}));
index.insert(vote{.connection_id = connection_id, .msg = msg});
cv.notify_one();
}
});
Expand Down
Loading

0 comments on commit f3ff3e8

Please sign in to comment.