Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Net plugin send priority - 1.8 #8547

Merged
merged 1 commit into from
Jan 31, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 37 additions & 37 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,11 @@ namespace eosio {
void enqueue( const net_message &msg, bool trigger_send = true );
void enqueue_block( const signed_block_ptr& sb, bool trigger_send = true, bool to_sync_queue = false);
void enqueue_buffer( const std::shared_ptr<std::vector<char>>& send_buffer,
bool trigger_send, int priority, go_away_reason close_after_send,
bool trigger_send, go_away_reason close_after_send,
bool to_sync_queue = false);
void cancel_sync(go_away_reason);
void flush_queues();
bool enqueue_sync_block();
void enqueue_sync_block();
void request_sync_blocks(uint32_t start, uint32_t end);

void cancel_wait();
Expand All @@ -586,10 +586,9 @@ namespace eosio {

void queue_write(const std::shared_ptr<vector<char>>& buff,
bool trigger_send,
int priority,
std::function<void(boost::system::error_code, std::size_t)> callback,
bool to_sync_queue = false);
void do_queue_write(int priority);
void do_queue_write();

bool add_peer_block(const peer_block_state& pbs);
bool peer_has_block(const block_id_type& blkid);
Expand Down Expand Up @@ -818,7 +817,7 @@ namespace eosio {
for(auto tx = my_impl->local_txns.begin(); tx != my_impl->local_txns.end(); ++tx ){
const bool found = known_ids.find( tx->id ) != known_ids.cend();
if( !found ) {
queue_write( tx->serialized_txn, true, priority::low, []( boost::system::error_code ec, std::size_t ) {} );
queue_write( tx->serialized_txn, true, []( boost::system::error_code ec, std::size_t ) {} );
}
}
}
Expand All @@ -827,7 +826,7 @@ namespace eosio {
for(const auto& t : ids) {
auto tx = my_impl->local_txns.get<by_id>().find(t);
if( tx != my_impl->local_txns.end() ) {
queue_write( tx->serialized_txn, true, priority::low, []( boost::system::error_code ec, std::size_t ) {} );
queue_write( tx->serialized_txn, true, []( boost::system::error_code ec, std::size_t ) {} );
}
}
}
Expand Down Expand Up @@ -939,7 +938,6 @@ namespace eosio {

void connection::queue_write(const std::shared_ptr<vector<char>>& buff,
bool trigger_send,
int priority,
std::function<void(boost::system::error_code, std::size_t)> callback,
bool to_sync_queue) {
if( !buffer_queue.add_write_queue( buff, callback, to_sync_queue )) {
Expand All @@ -949,11 +947,11 @@ namespace eosio {
return;
}
if( buffer_queue.is_out_queue_empty() && trigger_send) {
do_queue_write( priority );
do_queue_write();
}
}

void connection::do_queue_write(int priority) {
void connection::do_queue_write() {
if( !buffer_queue.ready_to_send() )
return;
connection_wptr c(shared_from_this());
Expand All @@ -966,8 +964,8 @@ namespace eosio {
buffer_queue.fill_out_buffer( bufs );

boost::asio::async_write(*socket, bufs,
boost::asio::bind_executor(strand, [c, socket=socket, priority]( boost::system::error_code ec, std::size_t w ) {
app().post(priority, [c, priority, ec, w]() {
boost::asio::bind_executor(strand, [c, socket=socket]( boost::system::error_code ec, std::size_t w ) {
app().post(priority::high, [c, ec, w]() {
try {
auto conn = c.lock();
if(!conn)
Expand All @@ -988,7 +986,7 @@ namespace eosio {
}
conn->buffer_queue.clear_out_queue();
conn->enqueue_sync_block();
conn->do_queue_write( priority );
conn->do_queue_write();
}
catch(const std::exception &ex) {
auto conn = c.lock();
Expand Down Expand Up @@ -1027,26 +1025,28 @@ namespace eosio {
}
}

bool connection::enqueue_sync_block() {
if (!peer_requested)
return false;
uint32_t num = ++peer_requested->last;
bool trigger_send = num == peer_requested->start_block;
if(num == peer_requested->end_block) {
peer_requested.reset();
fc_ilog( logger, "completing enqueue_sync_block ${num} to ${p}", ("num", num)("p", peer_name()) );
}
try {
controller& cc = my_impl->chain_plug->chain();
signed_block_ptr sb = cc.fetch_block_by_number(num);
if(sb) {
enqueue_block( sb, trigger_send, true);
return true;
void connection::enqueue_sync_block() {
connection_wptr c(shared_from_this());
app().post( priority::low, [c]() {
auto conn = c.lock();
if(!conn) return;
if( !conn->peer_requested )
return;
uint32_t num = ++conn->peer_requested->last;
if( num == conn->peer_requested->end_block ) {
conn->peer_requested.reset();
fc_ilog( logger, "completing enqueue_sync_block ${num} to ${p}", ("num", num)( "p", conn->peer_name() ) );
}
} catch ( ... ) {
fc_wlog( logger, "write loop exception" );
}
return false;
try {
controller& cc = my_impl->chain_plug->chain();
signed_block_ptr sb = cc.fetch_block_by_number( num );
if( sb ) {
conn->enqueue_block( sb, true, true );
}
} catch( ... ) {
fc_wlog( logger, "write loop exception" );
}
} );
}

void connection::enqueue( const net_message& m, bool trigger_send ) {
Expand All @@ -1067,7 +1067,7 @@ namespace eosio {
ds.write( header, header_size );
fc::raw::pack( ds, m );

enqueue_buffer( send_buffer, trigger_send, priority::low, close_after_send );
enqueue_buffer( send_buffer, trigger_send, close_after_send );
}

template< typename T>
Expand Down Expand Up @@ -1103,15 +1103,15 @@ namespace eosio {
}

void connection::enqueue_block( const signed_block_ptr& sb, bool trigger_send, bool to_sync_queue) {
enqueue_buffer( create_send_buffer( sb ), trigger_send, priority::low, no_reason, to_sync_queue);
enqueue_buffer( create_send_buffer( sb ), trigger_send, no_reason, to_sync_queue);
}

void connection::enqueue_buffer( const std::shared_ptr<std::vector<char>>& send_buffer,
bool trigger_send, int priority, go_away_reason close_after_send,
bool trigger_send, go_away_reason close_after_send,
bool to_sync_queue)
{
connection_wptr weak_this = shared_from_this();
queue_write(send_buffer,trigger_send, priority,
queue_write(send_buffer,trigger_send,
[weak_this, close_after_send](boost::system::error_code ec, std::size_t ) {
connection_ptr conn = weak_this.lock();
if (conn) {
Expand Down Expand Up @@ -1683,7 +1683,7 @@ namespace eosio {
send_buffer = create_send_buffer( bs->block );
}
fc_dlog(logger, "bcast block ${b} to ${p}", ("b", bnum)("p", cp->peer_name()));
cp->enqueue_buffer( send_buffer, true, priority::high, no_reason );
cp->enqueue_buffer( send_buffer, true, no_reason );
}
}

Expand Down Expand Up @@ -2242,7 +2242,7 @@ namespace eosio {
void net_plugin_impl::send_transaction_to_all(const std::shared_ptr<std::vector<char>>& send_buffer, VerifierFunc verify) {
for( auto &c : connections) {
if( c->current() && verify( c )) {
c->enqueue_buffer( send_buffer, true, priority::low, no_reason );
c->enqueue_buffer( send_buffer, true, no_reason );
}
}
}
Expand Down