Skip to content

Commit

Permalink
GH-529 Do not post unlinkable blocks to the main thread
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Aug 13, 2024
1 parent 3bec2b1 commit 439a7a8
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 42 deletions.
2 changes: 1 addition & 1 deletion libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4012,7 +4012,7 @@ struct controller_impl {
EOS_ASSERT( b, block_validate_exception, "null block" );

auto f = [&](auto& forkdb) -> std::optional<block_handle> {
// previous not found could mean that previous block not applied yet
// previous not found, means it is unlinkable
auto prev = forkdb.get_block( b->previous, include_root_t::yes );
if( !prev ) return {};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace eosio::chain::plugin_interface {
namespace incoming {
namespace methods {
// synchronously push a block/trx to a single provider, block_state_legacy_ptr may be null
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const block_id_type&, const std::optional<block_handle>&), first_provider_policy>;
using block_sync = method_decl<chain_plugin_interface, bool(const signed_block_ptr&, const block_id_type&, const block_handle&), first_provider_policy>;
using transaction_async = method_decl<chain_plugin_interface, void(const packed_transaction_ptr&, bool, transaction_metadata::trx_type, bool, next_function<transaction_trace_ptr>), first_provider_policy>;
}
}
Expand Down
13 changes: 8 additions & 5 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1207,8 +1207,8 @@ chain_apis::read_only chain_plugin::get_read_only_api(const fc::microseconds& ht
}


bool chain_plugin::accept_block(const signed_block_ptr& block, const block_id_type& id, const std::optional<block_handle>& obt ) {
return my->incoming_block_sync_method(block, id, obt);
bool chain_plugin::accept_block(const signed_block_ptr& block, const block_id_type& id, const block_handle& bh ) {
return my->incoming_block_sync_method(block, id, bh);
}

void chain_plugin::accept_transaction(const chain::packed_transaction_ptr& trx, next_function<chain::transaction_trace_ptr> next) {
Expand Down Expand Up @@ -2115,13 +2115,16 @@ fc::variant read_only::get_block_header_state(const get_block_header_state_param
void read_write::push_block(read_write::push_block_params&& params, next_function<read_write::push_block_results> next) {
try {
auto b = std::make_shared<signed_block>( std::move(params) );
app().get_method<incoming::methods::block_sync>()(b, b->calculate_id(), std::optional<block_handle>{});
block_id_type id = b->calculate_id();
auto bhf = db.create_block_handle_future( id, b );
block_handle bh = bhf.get();
app().get_method<incoming::methods::block_sync>()(b, id, bh);
next(read_write::push_block_results{});
} catch ( boost::interprocess::bad_alloc& ) {
handle_db_exhaustion();
} catch ( const std::bad_alloc& ) {
handle_bad_alloc();
} FC_LOG_AND_DROP()
next(read_write::push_block_results{});
} CATCH_AND_CALL(next);
}

void read_write::push_transaction(const read_write::push_transaction_params& params, next_function<read_write::push_transaction_results> next) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,7 @@ class chain_plugin : public plugin<chain_plugin> {
chain_apis::read_write get_read_write_api(const fc::microseconds& http_max_response_time);
chain_apis::read_only get_read_only_api(const fc::microseconds& http_max_response_time) const;

bool accept_block( const chain::signed_block_ptr& block, const chain::block_id_type& id, const std::optional<chain::block_handle>& obt );
bool accept_block( const chain::signed_block_ptr& block, const chain::block_id_type& id, const chain::block_handle& bh );
void accept_transaction(const chain::packed_transaction_ptr& trx, chain::plugin_interface::next_function<chain::transaction_trace_ptr> next);

// Only call this after plugin_initialize()!
Expand Down
44 changes: 20 additions & 24 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ namespace eosio {
// returns calculated number of blocks combined latency
uint32_t calc_block_latency();

void process_signed_block( const block_id_type& id, signed_block_ptr block, const std::optional<block_handle>& obt );
void process_signed_block( const block_id_type& id, signed_block_ptr block, const block_handle& bh );

fc::variant_object get_logger_variant() const {
fc::mutable_variant_object mvo;
Expand Down Expand Up @@ -3741,13 +3741,13 @@ namespace eosio {
return;
}

std::optional<block_handle> obt;
std::optional<block_handle> obh;
bool exception = false;
sync_manager::closing_mode close_mode = sync_manager::closing_mode::handshake;
try {
// this may return null if block is not immediately ready to be processed
obt = cc.create_block_handle( id, ptr );
} catch( const invalid_qc_claim &ex) {
// this may return null if block is not linkable
obh = cc.create_block_handle( id, ptr );
} catch( const invalid_qc_claim& ex) {
exception = true;
close_mode = sync_manager::closing_mode::immediately;
fc_wlog( logger, "invalid QC claim exception, connection - ${cid}: #${n} ${id}...: ${m}",
Expand All @@ -3761,7 +3761,7 @@ namespace eosio {
fc_wlog( logger, "bad block connection - ${cid}: #${n} ${id}...: unknown exception",
("cid", cid)("n", ptr->block_num())("id", id.str().substr(8,16)));
}
if( exception ) {
if( exception || !obh) {
c->strand.post( [c, id, blk_num=ptr->block_num(), close_mode]() {
my_impl->sync_master->rejected_block( c, blk_num, close_mode );
my_impl->dispatcher.rejected_block( id );
Expand All @@ -3770,30 +3770,26 @@ namespace eosio {
}


uint32_t block_num = obt ? obt->block_num() : 0;
assert(obh);
uint32_t block_num = obh->block_num();

if( block_num != 0 ) {
assert(obt);
fc_dlog( logger, "validated block header, broadcasting immediately, connection - ${cid}, blk num = ${num}, id = ${id}",
("cid", cid)("num", block_num)("id", obt->id()) );
my_impl->dispatcher.add_peer_block( obt->id(), cid ); // no need to send back to sender
my_impl->dispatcher.bcast_block( obt->block(), obt->id() );
}
fc_dlog( logger, "validated block header, broadcasting immediately, connection - ${cid}, blk num = ${num}, id = ${id}",
("cid", cid)("num", block_num)("id", obh->id()) );
my_impl->dispatcher.add_peer_block( obh->id(), cid ); // no need to send back to sender
my_impl->dispatcher.bcast_block( obh->block(), obh->id() );

fc_dlog(logger, "posting block ${n} to app thread", ("n", ptr->block_num()));
app().executor().post(priority::medium, exec_queue::read_write, [ptr{std::move(ptr)}, obt{std::move(obt)}, id, c{std::move(c)}]() mutable {
c->process_signed_block( id, std::move(ptr), obt );
app().executor().post(priority::medium, exec_queue::read_write, [ptr{std::move(ptr)}, bh{std::move(*obh)}, id, c{std::move(c)}]() mutable {
c->process_signed_block( id, std::move(ptr), bh );
});

if( block_num != 0 ) {
// ready to process immediately, so signal producer to interrupt start_block
my_impl->producer_plug->received_block(block_num);
}
// ready to process immediately, so signal producer to interrupt start_block
my_impl->producer_plug->received_block(block_num);
});
}

// called from application thread
void connection::process_signed_block( const block_id_type& blk_id, signed_block_ptr block, const std::optional<block_handle>& obt ) {
void connection::process_signed_block( const block_id_type& blk_id, signed_block_ptr block, const block_handle& bh ) {
controller& cc = my_impl->chain_plug->chain();
uint32_t blk_num = block_header::num_from_id(blk_id);
// use c in this method instead of this to highlight that all methods called on c-> must be thread safe
Expand All @@ -3817,13 +3813,13 @@ namespace eosio {
fc_elog( logger, "caught an unknown exception trying to fetch block ${id}, conn ${c}", ("id", blk_id)("c", connection_id) );
}

fc_dlog( logger, "received signed_block: #${n} block age in secs = ${age}, connection - ${cid}, ${v}, lib #${lib}",
("n", blk_num)("age", age.to_seconds())("cid", c->connection_id)("v", obt ? "header validated" : "header validation pending")("lib", lib) );
fc_dlog( logger, "received signed_block: #${n} block age in secs = ${age}, connection - ${cid}, header validated, lib #${lib}",
("n", blk_num)("age", age.to_seconds())("cid", c->connection_id)("lib", lib) );

go_away_reason reason = no_reason;
bool accepted = false;
try {
accepted = my_impl->chain_plug->accept_block(block, blk_id, obt);
accepted = my_impl->chain_plug->accept_block(block, blk_id, bh);
my_impl->update_chain_info();
} catch( const unlinkable_block_exception &ex) {
fc_ilog(logger, "unlinkable_block_exception connection - ${cid}: #${n} ${id}...: ${m}",
Expand Down
13 changes: 3 additions & 10 deletions plugins/producer_plugin/producer_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
_time_tracker.clear();
}

bool on_incoming_block(const signed_block_ptr& block, const block_id_type& id, const std::optional<block_handle>& obt) {
bool on_incoming_block(const signed_block_ptr& block, const block_id_type& id, const block_handle& bh) {
auto now = fc::time_point::now();
_time_tracker.add_idle_time(now);

Expand All @@ -693,14 +693,8 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin
EOS_ASSERT(block->timestamp < (now + fc::seconds(7)), block_from_the_future, "received a block from the future, ignoring it: ${id}", ("id", id));

// start processing of block
std::future<block_handle> btf;
if (!obt) {
btf = chain.create_block_handle_future(id, block);
}

if (in_producing_mode()) {
fc_ilog(_log, "producing, incoming block #${num} id: ${id}", ("num", blk_num)("id", id));
const block_handle& bh = obt ? *obt : btf.get();
chain.accept_block(bh);
_time_tracker.add_other_time();
return true; // return true because block was accepted
Expand All @@ -721,7 +715,6 @@ class producer_plugin_impl : public std::enable_shared_from_this<producer_plugin

controller::block_report br;
try {
const block_handle& bh = obt ? *obt : btf.get();
chain.push_block(
br,
bh,
Expand Down Expand Up @@ -1276,8 +1269,8 @@ void producer_plugin_impl::plugin_initialize(const boost::program_options::varia
ilog("read-only-threads ${s}, max read-only trx time to be enforced: ${t} us", ("s", _ro_thread_pool_size)("t", _ro_max_trx_time_us));

_incoming_block_sync_provider = app().get_method<incoming::methods::block_sync>().register_provider(
[this](const signed_block_ptr& block, const block_id_type& block_id, const std::optional<block_handle>& obt) {
return on_incoming_block(block, block_id, obt);
[this](const signed_block_ptr& block, const block_id_type& block_id, const block_handle& bh) {
return on_incoming_block(block, block_id, bh);
});

_incoming_transaction_async_provider =
Expand Down

0 comments on commit 439a7a8

Please sign in to comment.