Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

net_plugin reduce signed_block copies #6397

Merged
merged 10 commits into from
Nov 28, 2018
2 changes: 1 addition & 1 deletion libraries/chain/block_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ namespace eosio { namespace chain {
old_block_stream.read( reinterpret_cast<char*>(&tmp_pos), sizeof(tmp_pos) );
}
if( pos != tmp_pos ) {
bad_block = tmp;
bad_block.emplace(std::move(tmp));
break;
}

Expand Down
6 changes: 5 additions & 1 deletion libraries/chain/include/eosio/chain/block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ namespace eosio { namespace chain {
/**
*/
struct signed_block : public signed_block_header {
using signed_block_header::signed_block_header;
private:
signed_block( const signed_block& ) = default;
public:
signed_block() = default;
signed_block( const signed_block_header& h ):signed_block_header(h){}
signed_block( signed_block&& ) = default;
signed_block clone() const { return *this; }

vector<transaction_receipt> transactions; /// new or generated transactions
extensions_type block_extensions;
Expand Down
6 changes: 3 additions & 3 deletions plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ bool chain_plugin::import_reversible_blocks( const fc::path& reversible_dir,

new_reversible.create<reversible_block_object>( [&]( auto& ubo ) {
ubo.blocknum = num;
ubo.set_block( std::make_shared<signed_block>(tmp) );
ubo.set_block( std::make_shared<signed_block>(std::move(tmp)) );
});
end = num;
}
Expand Down Expand Up @@ -1526,9 +1526,9 @@ fc::variant read_only::get_block_header_state(const get_block_header_state_param
return vo;
}

void read_write::push_block(const read_write::push_block_params& params, next_function<read_write::push_block_results> next) {
void read_write::push_block(read_write::push_block_params params, next_function<read_write::push_block_results> next) {
arhag marked this conversation as resolved.
Show resolved Hide resolved
try {
app().get_method<incoming::methods::block_sync>()(std::make_shared<signed_block>(params));
app().get_method<incoming::methods::block_sync>()(std::make_shared<signed_block>(std::move(params)));
next(read_write::push_block_results{});
} catch ( boost::interprocess::bad_alloc& ) {
chain_plugin::handle_db_exhaustion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ class read_write {

using push_block_params = chain::signed_block;
using push_block_results = empty;
void push_block(const push_block_params& params, chain::plugin_interface::next_function<push_block_results> next);
void push_block(push_block_params params, chain::plugin_interface::next_function<push_block_results> next);
arhag marked this conversation as resolved.
Show resolved Hide resolved

using push_transaction_params = fc::variant_object;
struct push_transaction_results {
Expand Down
2 changes: 1 addition & 1 deletion plugins/net_plugin/include/eosio/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ namespace eosio {
notice_message,
request_message,
sync_request_message,
signed_block,
signed_block, // which = 7
packed_transaction>;

} // namespace eosio
Expand Down
148 changes: 92 additions & 56 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ namespace eosio {

using socket_ptr = std::shared_ptr<tcp::socket>;

using net_message_ptr = shared_ptr<net_message>;

struct node_transaction_state {
transaction_id_type id;
time_point_sec expires; /// time after which this may be purged.
Expand Down Expand Up @@ -217,7 +215,8 @@ namespace eosio {
void handle_message( connection_ptr c, const notice_message &msg);
void handle_message( connection_ptr c, const request_message &msg);
void handle_message( connection_ptr c, const sync_request_message &msg);
void handle_message( connection_ptr c, const signed_block &msg);
void handle_message( connection_ptr c, const signed_block &msg) = delete; // signed_block_ptr overload used instead
void handle_message( connection_ptr c, const signed_block_ptr &msg);
void handle_message( connection_ptr c, const packed_transaction &msg);

void start_conn_timer(boost::asio::steady_timer::duration du, std::weak_ptr<connection> from_connection);
Expand Down Expand Up @@ -557,6 +556,8 @@ namespace eosio {
void stop_send();

void enqueue( const net_message &msg, bool trigger_send = true );
void enqueue_block( const signed_block_ptr& sb, bool trigger_send = true );
void enqueue_buffer( const std::shared_ptr<std::vector<char>>& send_buffer, bool trigger_send, go_away_reason close_after_send );
void cancel_sync(go_away_reason);
void flush_queues();
bool enqueue_sync_block();
Expand Down Expand Up @@ -612,13 +613,18 @@ namespace eosio {
}
};

struct msgHandler : public fc::visitor<void> {
struct msg_handler : public fc::visitor<void> {
net_plugin_impl &impl;
connection_ptr c;
msgHandler( net_plugin_impl &imp, connection_ptr conn) : impl(imp), c(conn) {}
msg_handler( net_plugin_impl &imp, connection_ptr conn) : impl(imp), c(conn) {}

void operator()( signed_block& msg ) const
arhag marked this conversation as resolved.
Show resolved Hide resolved
{
impl.handle_message( c, std::make_shared<signed_block>( std::move( msg )));
}

template <typename T>
void operator()(const T &msg) const
void operator()( const T& msg ) const
{
impl.handle_message( c, msg);
}
Expand Down Expand Up @@ -669,7 +675,7 @@ namespace eosio {

void bcast_transaction (const packed_transaction& msg);
void rejected_transaction (const transaction_id_type& msg);
void bcast_block (const signed_block& msg);
void bcast_block (const block_state_ptr& bs);
void rejected_block (const block_id_type &id);

void recv_block (connection_ptr conn, const block_id_type& msg, uint32_t bnum);
Expand Down Expand Up @@ -895,7 +901,7 @@ namespace eosio {
if (bstack.back()->previous == lib_id || bstack.back()->previous == remote_head_id) {
count = bstack.size();
while (bstack.size()) {
enqueue(*bstack.back());
enqueue_block( bstack.back() );
bstack.pop_back();
}
}
Expand All @@ -916,7 +922,7 @@ namespace eosio {
signed_block_ptr b = cc.fetch_block_by_id(blkid);
if(b) {
fc_dlog(logger,"found block for id at num ${n}",("n",b->block_num()));
enqueue(net_message(*b));
enqueue_block( b );
}
else {
ilog("fetch block by id returned null, id ${id} on block ${c} of ${s} for ${p}",
Expand Down Expand Up @@ -1075,7 +1081,7 @@ namespace eosio {
try {
signed_block_ptr sb = cc.fetch_block_by_number(num);
if(sb) {
enqueue( *sb, trigger_send);
enqueue_block( sb, trigger_send);
return true;
}
} catch ( ... ) {
Expand All @@ -1084,22 +1090,47 @@ namespace eosio {
return false;
}

void connection::enqueue( const net_message &m, bool trigger_send ) {
void connection::enqueue( const net_message& m, bool trigger_send ) {
go_away_reason close_after_send = no_reason;
if (m.contains<go_away_message>()) {
close_after_send = m.get<go_away_message>().reason;
}

uint32_t payload_size = fc::raw::pack_size( m );
char * header = reinterpret_cast<char*>(&payload_size);
size_t header_size = sizeof(payload_size);

char* header = reinterpret_cast<char*>(&payload_size);
size_t header_size = sizeof(payload_size);
size_t buffer_size = header_size + payload_size;

auto send_buffer = std::make_shared<vector<char>>(buffer_size);
fc::datastream<char*> ds( send_buffer->data(), buffer_size);
ds.write( header, header_size );
fc::raw::pack( ds, m );

enqueue_buffer( send_buffer, trigger_send, close_after_send );
}

void connection::enqueue_block( const signed_block_ptr& sb, bool trigger_send ) {
// this implementation is to avoid copy of signed_block to net_message
int which = 7; // matches which of net_message for signed_block

uint32_t which_size = fc::raw::pack_size( unsigned_int( which ));
uint32_t payload_size = which_size + fc::raw::pack_size( *sb );

char* header = reinterpret_cast<char*>(&payload_size);
size_t header_size = sizeof(payload_size);
size_t buffer_size = header_size + payload_size;

auto send_buffer = std::make_shared<vector<char>>(buffer_size);
fc::datastream<char*> ds( send_buffer->data(), buffer_size);
ds.write( header, header_size );
fc::raw::pack( ds, unsigned_int( which ));
fc::raw::pack( ds, *sb );

enqueue_buffer( send_buffer, true, no_reason );
arhag marked this conversation as resolved.
Show resolved Hide resolved
}

void connection::enqueue_buffer( const std::shared_ptr<std::vector<char>>& send_buffer, bool trigger_send, go_away_reason close_after_send ) {
connection_wptr weak_this = shared_from_this();
queue_write(send_buffer,trigger_send,
[weak_this, close_after_send](boost::system::error_code ec, std::size_t ) {
Expand Down Expand Up @@ -1197,7 +1228,7 @@ namespace eosio {
auto ds = pending_message_buffer.create_datastream();
net_message msg;
fc::raw::unpack(ds, msg);
msgHandler m(impl, shared_from_this() );
msg_handler m(impl, shared_from_this() );
msg.visit(m);
arhag marked this conversation as resolved.
Show resolved Hide resolved
} catch( const fc::exception& e ) {
edump((e.to_detail_string() ));
Expand Down Expand Up @@ -1589,49 +1620,55 @@ namespace eosio {

//------------------------------------------------------------------------

void dispatch_manager::bcast_block (const signed_block &bsum) {
void dispatch_manager::bcast_block (const block_state_ptr& bs) {
std::set<connection_ptr> skips;
auto range = received_blocks.equal_range(bsum.id());
auto range = received_blocks.equal_range(bs->id);
for (auto org = range.first; org != range.second; ++org) {
skips.insert(org->second);
}
received_blocks.erase(range.first, range.second);

net_message msg(bsum);
uint32_t packsiz = fc::raw::pack_size(msg);
uint32_t msgsiz = packsiz + sizeof(packsiz);
block_id_type bid = bsum.id();
uint32_t bnum = bsum.block_num();
block_id_type bid = bs->id;
uint32_t bnum = bs->block_num;
peer_block_state pbstate = {bid, bnum, false, true, time_point()};

peer_block_state pbstate = {bid, bnum, false,true,time_point()};
// skip will be empty if our producer emitted this block so just send it
if (( large_msg_notify && msgsiz > just_send_it_max) && !skips.empty()) {
fc_ilog(logger, "block size is ${ms}, sending notify",("ms", msgsiz));
notice_message pending_notify;
pending_notify.known_blocks.mode = normal;
pending_notify.known_blocks.ids.push_back( bid );
pending_notify.known_trx.mode = none;
my_impl->send_all(pending_notify, [&skips, pbstate](connection_ptr c) -> bool {
if (skips.find(c) != skips.end() || !c->current())
return false;

bool unknown = c->add_peer_block(pbstate);
if (!unknown) {
elog("${p} already has knowledge of block ${b}", ("p",c->peer_name())("b",pbstate.block_num));
}
return unknown;
});
if( large_msg_notify ) {
int which = 7;
uint32_t whichsiz = fc::raw::pack_size( unsigned_int( which ) );
uint32_t packsiz = whichsiz + fc::raw::pack_size( *bs->block );
uint32_t msgsiz = packsiz + sizeof(packsiz);

if( (msgsiz > just_send_it_max) && !skips.empty() ) {
fc_ilog( logger, "block size is ${ms}, sending notify", ("ms", msgsiz));
notice_message pending_notify;
pending_notify.known_blocks.mode = normal;
pending_notify.known_blocks.ids.push_back( bid );
pending_notify.known_trx.mode = none;
my_impl->send_all( pending_notify, [&skips, pbstate]( connection_ptr c ) -> bool {
if( skips.find( c ) != skips.end() || !c->current())
return false;

bool unknown = c->add_peer_block( pbstate );
if( !unknown ) {
elog( "${p} already has knowledge of block ${b}", ("p", c->peer_name())( "b", pbstate.block_num ));
}
return unknown;
} );

return;
}
}
else {
pbstate.is_known = true;
for (auto cp : my_impl->connections) {
if (skips.find(cp) != skips.end() || !cp->current()) {
continue;
}
cp->add_peer_block(pbstate);
cp->enqueue( msg );

pbstate.is_known = true;
for( auto cp : my_impl->connections ) {
if( skips.find( cp ) != skips.end() || !cp->current() ) {
continue;
}
cp->add_peer_block( pbstate );
cp->enqueue_block( bs->block );
}

}

void dispatch_manager::recv_block (connection_ptr c, const block_id_type& id, uint32_t bnum) {
Expand Down Expand Up @@ -1689,7 +1726,7 @@ namespace eosio {
my_impl->local_txns.insert(std::move(nts));

if( !large_msg_notify || bufsiz <= just_send_it_max) {
my_impl->send_all( trx, [id, &skips, trx_expiration](connection_ptr c) -> bool {
my_impl->send_all( msg, [id, &skips, trx_expiration](connection_ptr c) -> bool {
if( skips.find(c) != skips.end() || c->syncing ) {
return false;
}
Expand Down Expand Up @@ -2484,10 +2521,10 @@ namespace eosio {
});
}

void net_plugin_impl::handle_message( connection_ptr c, const signed_block &msg) {
void net_plugin_impl::handle_message( connection_ptr c, const signed_block_ptr& msg) {
controller &cc = chain_plug->chain();
block_id_type blk_id = msg.id();
uint32_t blk_num = msg.block_num();
block_id_type blk_id = msg->id();
uint32_t blk_num = msg->block_num();
fc_dlog(logger, "canceling wait on ${p}", ("p",c->peer_name()));
c->cancel_wait();

Expand All @@ -2502,14 +2539,13 @@ namespace eosio {
}

dispatcher->recv_block(c, blk_id, blk_num);
fc::microseconds age( fc::time_point::now() - msg.timestamp);
fc::microseconds age( fc::time_point::now() - msg->timestamp);
peer_ilog(c, "received signed_block : #${n} block age in secs = ${age}",
("n",blk_num)("age",age.to_seconds()));

go_away_reason reason = fatal_other;
try {
signed_block_ptr sbp = std::make_shared<signed_block>(msg);
chain_plug->accept_block(sbp); //, sync_master->is_active(c));
chain_plug->accept_block(msg); //, sync_master->is_active(c));
reason = no_reason;
} catch( const unlinkable_block_exception &ex) {
peer_elog(c, "bad signed_block : ${m}", ("m",ex.what()));
Expand All @@ -2532,7 +2568,7 @@ namespace eosio {

update_block_num ubn(blk_num);
if( reason == no_reason ) {
for (const auto &recpt : msg.transactions) {
for (const auto &recpt : msg->transactions) {
auto id = (recpt.trx.which() == 0) ? recpt.trx.get<transaction_id_type>() : recpt.trx.get<packed_transaction>().id();
auto ltx = local_txns.get<by_id>().find(id);
if( ltx != local_txns.end()) {
Expand Down Expand Up @@ -2662,10 +2698,10 @@ namespace eosio {

void net_plugin_impl::accepted_block(const block_state_ptr& block) {
fc_dlog(logger,"signaled, id = ${id}",("id", block->id));
dispatcher->bcast_block(*block->block);
dispatcher->bcast_block(block);
}

void net_plugin_impl::irreversible_block(const block_state_ptr&block) {
void net_plugin_impl::irreversible_block(const block_state_ptr& block) {
fc_dlog(logger,"signaled, id = ${id}",("id", block->id));
}

Expand Down
4 changes: 2 additions & 2 deletions unittests/block_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ BOOST_AUTO_TEST_CASE(block_with_invalid_tx_test)
auto b = main.produce_block();

// Make a copy of the valid block and corrupt the transaction
auto copy_b = std::make_shared<signed_block>(*b);
auto copy_b = std::make_shared<signed_block>(std::move(*b));
auto signed_tx = copy_b->transactions.back().trx.get<packed_transaction>().get_signed_transaction();
auto& act = signed_tx.actions.back();
auto act_data = act.data_as<newaccount>();
Expand Down Expand Up @@ -57,7 +57,7 @@ std::pair<signed_block_ptr, signed_block_ptr> corrupt_trx_in_block(validating_te
signed_block_ptr b = main.produce_block_no_validation();

// Make a copy of the valid block and corrupt the transaction
auto copy_b = std::make_shared<signed_block>(*b);
auto copy_b = std::make_shared<signed_block>(b->clone());
auto signed_tx = copy_b->transactions.back().trx.get<packed_transaction>().get_signed_transaction();
// Corrupt one signature
signed_tx.signatures.clear();
Expand Down
6 changes: 3 additions & 3 deletions unittests/forked_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ BOOST_AUTO_TEST_CASE( fork_with_bad_block ) try {
auto& fork = forks.at(j);

if (j <= i) {
auto copy_b = std::make_shared<signed_block>(*b);
auto copy_b = std::make_shared<signed_block>(b->clone());
if (j == i) {
// corrupt this block
fork.block_merkle = remote.control->head_block_state()->blockroot_merkle;
Expand Down Expand Up @@ -277,9 +277,9 @@ BOOST_AUTO_TEST_CASE( forking ) try {
}
wlog( "end push c2 blocks to c1" );
wlog( "now push dan's block to c1 but first corrupt it so it is a bad block" );
auto bad_block = *b;
signed_block bad_block = std::move(*b);
bad_block.transaction_mroot = bad_block.previous;
auto bad_block_bs = c.control->create_block_state_future( std::make_shared<signed_block>(bad_block) );
auto bad_block_bs = c.control->create_block_state_future( std::make_shared<signed_block>(std::move(bad_block)) );
c.control->abort_block();
BOOST_REQUIRE_EXCEPTION(c.control->push_block( bad_block_bs ), fc::exception,
[] (const fc::exception &ex)->bool {
Expand Down