From e1500951a3a70eaad983da1827cf0e1ee41cb433 Mon Sep 17 00:00:00 2001 From: Kevin Heifner Date: Thu, 30 Jan 2020 18:56:55 -0600 Subject: [PATCH] Process async_write callback at high priority to prevent it from blocking writes. Also enqueue sync blocks at low priority. --- plugins/net_plugin/net_plugin.cpp | 74 +++++++++++++++---------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 2a26add5659..d42c0c70aa9 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -571,11 +571,11 @@ namespace eosio { void enqueue( const net_message &msg, bool trigger_send = true ); void enqueue_block( const signed_block_ptr& sb, bool trigger_send = true, bool to_sync_queue = false); void enqueue_buffer( const std::shared_ptr>& send_buffer, - bool trigger_send, int priority, go_away_reason close_after_send, + bool trigger_send, go_away_reason close_after_send, bool to_sync_queue = false); void cancel_sync(go_away_reason); void flush_queues(); - bool enqueue_sync_block(); + void enqueue_sync_block(); void request_sync_blocks(uint32_t start, uint32_t end); void cancel_wait(); @@ -586,10 +586,9 @@ namespace eosio { void queue_write(const std::shared_ptr>& buff, bool trigger_send, - int priority, std::function callback, bool to_sync_queue = false); - void do_queue_write(int priority); + void do_queue_write(); bool add_peer_block(const peer_block_state& pbs); bool peer_has_block(const block_id_type& blkid); @@ -818,7 +817,7 @@ namespace eosio { for(auto tx = my_impl->local_txns.begin(); tx != my_impl->local_txns.end(); ++tx ){ const bool found = known_ids.find( tx->id ) != known_ids.cend(); if( !found ) { - queue_write( tx->serialized_txn, true, priority::low, []( boost::system::error_code ec, std::size_t ) {} ); + queue_write( tx->serialized_txn, true, []( boost::system::error_code ec, std::size_t ) {} ); } } } @@ -827,7 +826,7 @@ namespace eosio { for(const auto& t : ids) { auto tx = my_impl->local_txns.get().find(t); if( tx != my_impl->local_txns.end() ) { - queue_write( tx->serialized_txn, true, priority::low, []( boost::system::error_code ec, std::size_t ) {} ); + queue_write( tx->serialized_txn, true, []( boost::system::error_code ec, std::size_t ) {} ); } } } @@ -939,7 +938,6 @@ namespace eosio { void connection::queue_write(const std::shared_ptr>& buff, bool trigger_send, - int priority, std::function callback, bool to_sync_queue) { if( !buffer_queue.add_write_queue( buff, callback, to_sync_queue )) { @@ -949,11 +947,11 @@ namespace eosio { return; } if( buffer_queue.is_out_queue_empty() && trigger_send) { - do_queue_write( priority ); + do_queue_write(); } } - void connection::do_queue_write(int priority) { + void connection::do_queue_write() { if( !buffer_queue.ready_to_send() ) return; connection_wptr c(shared_from_this()); @@ -966,8 +964,8 @@ namespace eosio { buffer_queue.fill_out_buffer( bufs ); boost::asio::async_write(*socket, bufs, - boost::asio::bind_executor(strand, [c, socket=socket, priority]( boost::system::error_code ec, std::size_t w ) { - app().post(priority, [c, priority, ec, w]() { + boost::asio::bind_executor(strand, [c, socket=socket]( boost::system::error_code ec, std::size_t w ) { + app().post(priority::high, [c, ec, w]() { try { auto conn = c.lock(); if(!conn) @@ -988,7 +986,7 @@ namespace eosio { } conn->buffer_queue.clear_out_queue(); conn->enqueue_sync_block(); - conn->do_queue_write( priority ); + conn->do_queue_write(); } catch(const std::exception &ex) { auto conn = c.lock(); @@ -1027,26 +1025,28 @@ namespace eosio { } } - bool connection::enqueue_sync_block() { - if (!peer_requested) - return false; - uint32_t num = ++peer_requested->last; - bool trigger_send = num == peer_requested->start_block; - if(num == peer_requested->end_block) { - peer_requested.reset(); - fc_ilog( logger, "completing enqueue_sync_block ${num} to ${p}", ("num", num)("p", peer_name()) ); - } - try { - controller& cc = my_impl->chain_plug->chain(); - signed_block_ptr sb = cc.fetch_block_by_number(num); - if(sb) { - enqueue_block( sb, trigger_send, true); - return true; + void connection::enqueue_sync_block() { + connection_wptr c(shared_from_this()); + app().post( priority::low, [c]() { + auto conn = c.lock(); + if(!conn) return; + if( !conn->peer_requested ) + return; + uint32_t num = ++conn->peer_requested->last; + if( num == conn->peer_requested->end_block ) { + conn->peer_requested.reset(); + fc_ilog( logger, "completing enqueue_sync_block ${num} to ${p}", ("num", num)( "p", conn->peer_name() ) ); } - } catch ( ... ) { - fc_wlog( logger, "write loop exception" ); - } - return false; + try { + controller& cc = my_impl->chain_plug->chain(); + signed_block_ptr sb = cc.fetch_block_by_number( num ); + if( sb ) { + conn->enqueue_block( sb, true, true ); + } + } catch( ... ) { + fc_wlog( logger, "write loop exception" ); + } + } ); } void connection::enqueue( const net_message& m, bool trigger_send ) { @@ -1067,7 +1067,7 @@ namespace eosio { ds.write( header, header_size ); fc::raw::pack( ds, m ); - enqueue_buffer( send_buffer, trigger_send, priority::low, close_after_send ); + enqueue_buffer( send_buffer, trigger_send, close_after_send ); } template< typename T> @@ -1103,15 +1103,15 @@ namespace eosio { } void connection::enqueue_block( const signed_block_ptr& sb, bool trigger_send, bool to_sync_queue) { - enqueue_buffer( create_send_buffer( sb ), trigger_send, priority::low, no_reason, to_sync_queue); + enqueue_buffer( create_send_buffer( sb ), trigger_send, no_reason, to_sync_queue); } void connection::enqueue_buffer( const std::shared_ptr>& send_buffer, - bool trigger_send, int priority, go_away_reason close_after_send, + bool trigger_send, go_away_reason close_after_send, bool to_sync_queue) { connection_wptr weak_this = shared_from_this(); - queue_write(send_buffer,trigger_send, priority, + queue_write(send_buffer,trigger_send, [weak_this, close_after_send](boost::system::error_code ec, std::size_t ) { connection_ptr conn = weak_this.lock(); if (conn) { @@ -1683,7 +1683,7 @@ namespace eosio { send_buffer = create_send_buffer( bs->block ); } fc_dlog(logger, "bcast block ${b} to ${p}", ("b", bnum)("p", cp->peer_name())); - cp->enqueue_buffer( send_buffer, true, priority::high, no_reason ); + cp->enqueue_buffer( send_buffer, true, no_reason ); } } @@ -2242,7 +2242,7 @@ namespace eosio { void net_plugin_impl::send_transaction_to_all(const std::shared_ptr>& send_buffer, VerifierFunc verify) { for( auto &c : connections) { if( c->current() && verify( c )) { - c->enqueue_buffer( send_buffer, true, priority::low, no_reason ); + c->enqueue_buffer( send_buffer, true, no_reason ); } } }