From da552bbd7fb58161a9370d1a2cee6d0ae7e12d50 Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Thu, 22 Jun 2017 04:10:25 -0500 Subject: [PATCH 1/7] start message handling --- libraries/chain/CMakeLists.txt | 2 +- plugins/net_plugin/CMakeLists.txt | 2 +- .../include/eos/net_plugin/protocol.hpp | 14 +- plugins/net_plugin/net_plugin.cpp | 122 +++++++++++++++--- 4 files changed, 113 insertions(+), 27 deletions(-) diff --git a/libraries/chain/CMakeLists.txt b/libraries/chain/CMakeLists.txt index dfed91fb57a..00c20e7999a 100644 --- a/libraries/chain/CMakeLists.txt +++ b/libraries/chain/CMakeLists.txt @@ -23,7 +23,7 @@ add_library( eos_chain target_link_libraries( eos_chain fc chainbase eos_types Logging IR WAST WASM Runtime ) target_include_directories( eos_chain - PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" "${CMAKE_CURRENT_BINARY_DIR}/include" "${CMAKE_CURRENT_SOURCE_DIR}/../wasm-jit/Include" ) diff --git a/plugins/net_plugin/CMakeLists.txt b/plugins/net_plugin/CMakeLists.txt index 80515e5715a..4880a68ced3 100644 --- a/plugins/net_plugin/CMakeLists.txt +++ b/plugins/net_plugin/CMakeLists.txt @@ -3,7 +3,7 @@ add_library( net_plugin net_plugin.cpp ${HEADERS} ) -target_link_libraries( net_plugin chain_plugin appbase fc ) +target_link_libraries( net_plugin chain_plugin egenesis appbase fc ) target_include_directories( net_plugin PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" ) install( TARGETS diff --git a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp index b8026f2c684..bbf7213ce9b 100644 --- a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp +++ b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp @@ -8,7 +8,7 @@ namespace eos { struct handshake_message { int16_t network_version = 0; - fc::sha256 chain_id; ///< used to identify chain + chain_id_type chain_id; ///< used to identify chain fc::sha256 node_id; ///< used to identify peers and prevent self-connect uint64_t last_irreversible_block_num = 0; block_id_type last_irreversible_block_id; @@ -56,13 +56,13 @@ FC_REFLECT( eos::notice_message, (known_trx)(known_blocks) ) FC_REFLECT( eos::sync_request_message, (start_block)(end_block) ) FC_REFLECT( eos::peer_message, (peers) ) -/** +/** * Goals of Network Code 1. low latency to minimize missed blocks and potentially reduce block interval 2. minimize redundant data between blocks and transactions. 3. enable rapid sync of a new node -4. update to new boost / fc +4. update to new boost / fc @@ -90,23 +90,23 @@ Goals of Network Code wait for new validated block, transaction, or peer signal from network fiber } else { we assume peer is in sync mode in which case it is operating on a - request / response basis + request / response basis wait for notice of sync from the read loop } - read loop + read loop if hello message verify that peers Last Ir Block is in our state or disconnect, they are on fork verify peer network protocol if notice message update list of transactions known by remote peer - if trx message then insert into global state as unvalidated + if trx message then insert into global state as unvalidated if blk summary message then insert into global state *if* we know of all dependent transactions else close connection - + if my head block < the LIB of a peer and my head block age > block interval * round_size/2 then enter sync mode... divide the block numbers you need to fetch among peers and send fetch request diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index f067832e834..2218ccb7a08 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -2,6 +2,7 @@ #include #include +//#include #include #include @@ -35,7 +36,7 @@ struct node_transaction_state { * Index by is_known, block_num, validated_time, this is the order we will broadcast * to peer. * Index by is_noticed, validated_time - * + * */ struct transaction_state { transaction_id_type id; @@ -54,7 +55,7 @@ typedef multi_index_container< > transaction_state_index; /** - * + * */ struct block_state { block_id_type id; @@ -90,7 +91,10 @@ typedef multi_index_container< class connection { public: - connection( socket_ptr s ):socket(s){ + connection( socket_ptr s, const fc::sha256 &node_id ) + : socket(s), + local_node_id (node_id) + { wlog( "created connection" ); pending_message_buffer.resize( 1024*1024*4 ); } @@ -108,7 +112,7 @@ class connection { vector pending_message_buffer; handshake_message last_handshake; - + const fc::sha256& local_node_id; std::deque out_queue; void send( const net_message& m ) { @@ -118,7 +122,7 @@ class connection { } void send_next_message() { - if( !out_queue.size() ) + if( !out_queue.size() ) return; auto& m = out_queue.front(); @@ -141,7 +145,7 @@ class connection { } }); } -}; +}; // class connection @@ -158,14 +162,16 @@ class net_plugin_impl { std::set< connection* > connections; bool done = false; + fc::sha256 node_id; + void connect( const string& ep ) { auto host = ep.substr( 0, ep.find(':') ); auto port = ep.substr( host.size()+1, host.size() ); idump((host)(port)); - auto resolver = std::make_shared( std::ref( app().get_io_service() ) ); + auto resolver = std::make_shared( std::ref( app().get_io_service() ) ); tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str() ); - resolver->async_resolve( query, + resolver->async_resolve( query, [resolver,ep,this]( const boost::system::error_code& err, tcp::resolver::iterator endpoint_itr ){ if( !err ) { connect( resolver, endpoint_itr ); @@ -185,7 +191,7 @@ class net_plugin_impl { [sock,resolver,endpoint_itr,this]( const boost::system::error_code& err ) { if( !err ) { pending_connections.erase( sock ); - start_session( new connection( sock ) ); + start_session( new connection( sock, node_id ) ); } else { if( endpoint_itr != tcp::resolver::iterator() ) { connect( resolver, endpoint_itr ); @@ -216,10 +222,26 @@ class net_plugin_impl { ilog("network loop done"); } FC_CAPTURE_AND_RETHROW() } + void init_handshake (handshake_message &hm) { + hm.network_version = 0; + database_plugin* dbp = application::instance().find_plugin(); + //hm.chain_id = dbp->db().get_chain_id(); + // hm.node_id = fc::sha256; + hm.last_irreversible_block_num = 0; + // block_id_type last_irreversible_block_id; + // string os; + // string agent; + + } + void start_session( connection* con ) { connections.insert( con ); start_read_message( *con ); - con->send( handshake_message{} ); + + handshake_message hello; + init_handshake (hello); + con->send( hello ); + // con->readloop_complete = bf::async( [=](){ read_loop( con ); } ); // con->writeloop_complete = bf::async( [=](){ write_loop( con ); } ); } @@ -228,7 +250,7 @@ class net_plugin_impl { auto socket = std::make_shared( std::ref( app().get_io_service() ) ); acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) { if( !ec ) { - start_session( new connection( socket ) ); + start_session( new connection( socket, node_id ) ); start_listen_loop(); } else { elog( "Error accepting connection: ${m}", ("m", ec.message() ) ); @@ -238,7 +260,7 @@ class net_plugin_impl { void start_read_message( connection& c ) { c.pending_message_size = 0; - boost::asio::async_read( *c.socket, boost::asio::buffer((char*)&c.pending_message_size,sizeof(c.pending_message_size)), + boost::asio::async_read( *c.socket, boost::asio::buffer((char*)&c.pending_message_size,sizeof(c.pending_message_size)), [&]( boost::system::error_code ec, std::size_t bytes_transferred ) { ilog( "read size handler..." ); if( !ec ) { @@ -255,8 +277,39 @@ class net_plugin_impl { } ); } - void start_reading_pending_buffer( connection& c ) { - boost::asio::async_read( *c.socket, boost::asio::buffer(c.pending_message_buffer.data(), c.pending_message_size ), + + void handle_message (connection &c, handshake_message &msg) { + ilog ("got a handshake message"); + c.last_handshake = msg; + } + + + void handle_message (connection &c, peer_message &msg) { + ilog ("got a peer message"); + } + + void handle_message (connection &c, notice_message &msg) { + ilog ("got a notice message"); + } + + void handle_message (connection &c, sync_request_message &msg) { + ilog ("got a sync request message"); + } + + void handle_message (connection &c, block_summary_message &msg) { + ilog ("got a block summary message"); + } + + void handle_message (connection &c, SignedTransaction &msg) { + ilog ("got a SignedTransacton"); + } + + void handle_message (connection &c, signed_block &msg) { + ilog ("got a signed_block"); + } + + void start_reading_pending_buffer( connection& c ) { + boost::asio::async_read( *c.socket, boost::asio::buffer(c.pending_message_buffer.data(), c.pending_message_size ), [&]( boost::system::error_code ec, std::size_t bytes_transferred ) { ilog( "read buffer handler..." ); if( !ec ) { @@ -264,6 +317,37 @@ class net_plugin_impl { auto msg = fc::raw::unpack( c.pending_message_buffer ); ilog( "received message of size: ${s}", ("s",bytes_transferred) ); start_read_message( c ); + switch (msg.which()) { + case 0: { + handle_message (c, msg.get ()); + break; + } + case 1: { + handle_message (c, msg.get ()); + break; + } + case 2: { + break; + handle_message (c, msg.get ()); + break; + } + case 3: { + handle_message (c, msg.get ()); + break; + } + case 4: { + handle_message (c, msg.get()); + break; + } + case 5: { + handle_message (c, msg.get()); + break; + } + case 6: { + handle_message (c, msg.get()); + break; + } + } return; } catch ( const fc::exception& e ) { edump((e.to_detail_string() )); @@ -293,7 +377,7 @@ class net_plugin_impl { delete c; } -}; +}; // class net_plugin_impl net_plugin::net_plugin() :my( new net_plugin_impl ) { @@ -302,7 +386,7 @@ net_plugin::net_plugin() net_plugin::~net_plugin() { } -void net_plugin::set_program_options( options_description& cli, options_description& cfg ) +void net_plugin::set_program_options( options_description& cli, options_description& cfg ) { cfg.add_options() ("listen-endpoint", bpo::value()->default_value( "127.0.0.1:9876" ), "The local IP address and port to listen for incoming connections.") @@ -317,7 +401,7 @@ void net_plugin::plugin_initialize( const variables_map& options ) { auto lipstr = options.at("listen-endpoint").as< string >(); auto fcep = fc::ip::endpoint::from_string( lipstr ); my->listen_endpoint = tcp::endpoint( boost::asio::ip::address_v4::from_string( (string)fcep.get_address() ), fcep.port() ); - + ilog( "configured net to listen on ${ep}", ("ep", fcep) ); my->acceptor.reset( new tcp::acceptor( app().get_io_service() ) ); @@ -325,6 +409,8 @@ void net_plugin::plugin_initialize( const variables_map& options ) { if( options.count( "remote-endpoint" ) ) { my->seed_nodes = options.at( "remote-endpoint" ).as< vector >(); } + + // fc::rand_pseudo_bytes(&my->node_id.data[0], (int)my->node_id.size()); } void net_plugin::plugin_startup() { @@ -353,7 +439,7 @@ try { ilog( "close connections ${s}", ("s",my->connections.size()) ); auto cons = my->connections; - for( auto con : cons ) + for( auto con : cons ) con->socket->close(); while( my->connections.size() ) { From 84c8f42d4ad5265abba0b404e4b6199b4b808722 Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Thu, 22 Jun 2017 04:54:48 -0500 Subject: [PATCH 2/7] fix link line --- plugins/net_plugin/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/net_plugin/CMakeLists.txt b/plugins/net_plugin/CMakeLists.txt index 4880a68ced3..80515e5715a 100644 --- a/plugins/net_plugin/CMakeLists.txt +++ b/plugins/net_plugin/CMakeLists.txt @@ -3,7 +3,7 @@ add_library( net_plugin net_plugin.cpp ${HEADERS} ) -target_link_libraries( net_plugin chain_plugin egenesis appbase fc ) +target_link_libraries( net_plugin chain_plugin appbase fc ) target_include_directories( net_plugin PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" ) install( TARGETS From 9f0b1348fd07af02d3f1094b87e8072d1d5f25a4 Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Fri, 23 Jun 2017 03:21:19 -0500 Subject: [PATCH 3/7] added more data collection code --- .../include/eos/net_plugin/#net_plugin.hpp# | 25 ++++++ .../include/eos/net_plugin/protocol.hpp | 6 +- plugins/net_plugin/net_plugin.cpp | 83 +++++++++++++------ 3 files changed, 87 insertions(+), 27 deletions(-) create mode 100644 plugins/net_plugin/include/eos/net_plugin/#net_plugin.hpp# diff --git a/plugins/net_plugin/include/eos/net_plugin/#net_plugin.hpp# b/plugins/net_plugin/include/eos/net_plugin/#net_plugin.hpp# new file mode 100644 index 00000000000..dc86ba56491 --- /dev/null +++ b/plugins/net_plugin/include/eos/net_plugin/#net_plugin.hpp# @@ -0,0 +1,25 @@ +#pragma once +#include +#include + +namespace eos { + using namespace appbase; + + class net_plugin : public appbase::plugin + { + public: + net_plugin(); + virtual ~net_plugin(); + + APPBASE_PLUGIN_REQUIRES((chain_plugin)) + virtual void set_program_options(options_description& cli, options_description& cfg) override; + + void plugin_initialize(const variables_map& options); + void plugin_startup(); + void plugin_shutdown(); + + private: + std::unique_ptr myeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee; + }; + +} diff --git a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp index bbf7213ce9b..5651b096aa8 100644 --- a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp +++ b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp @@ -10,7 +10,7 @@ namespace eos { int16_t network_version = 0; chain_id_type chain_id; ///< used to identify chain fc::sha256 node_id; ///< used to identify peers and prevent self-connect - uint64_t last_irreversible_block_num = 0; + uint32_t last_irreversible_block_num = 0; block_id_type last_irreversible_block_id; string os; string agent; @@ -27,8 +27,8 @@ namespace eos { }; struct sync_request_message { - uint64_t start_block; - uint64_t end_block; + uint32_t start_block; + uint32_t end_block; }; struct peer_message { diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 2218ccb7a08..c1bf5a0aae0 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -2,12 +2,13 @@ #include #include -//#include +#include #include #include #include #include +#include #include @@ -90,10 +91,9 @@ typedef multi_index_container< > sync_request_index; class connection { - public: - connection( socket_ptr s, const fc::sha256 &node_id ) - : socket(s), - local_node_id (node_id) +public: + connection( socket_ptr s ) + : socket(s) { wlog( "created connection" ); pending_message_buffer.resize( 1024*1024*4 ); @@ -112,7 +112,7 @@ class connection { vector pending_message_buffer; handshake_message last_handshake; - const fc::sha256& local_node_id; + std::deque out_queue; void send( const net_message& m ) { @@ -162,7 +162,7 @@ class net_plugin_impl { std::set< connection* > connections; bool done = false; - fc::sha256 node_id; + fc::optional hello; void connect( const string& ep ) { auto host = ep.substr( 0, ep.find(':') ); @@ -191,7 +191,7 @@ class net_plugin_impl { [sock,resolver,endpoint_itr,this]( const boost::system::error_code& err ) { if( !err ) { pending_connections.erase( sock ); - start_session( new connection( sock, node_id ) ); + start_session( new connection( sock ) ); } else { if( endpoint_itr != tcp::resolver::iterator() ) { connect( resolver, endpoint_itr ); @@ -222,25 +222,35 @@ class net_plugin_impl { ilog("network loop done"); } FC_CAPTURE_AND_RETHROW() } - void init_handshake (handshake_message &hm) { - hm.network_version = 0; - database_plugin* dbp = application::instance().find_plugin(); - //hm.chain_id = dbp->db().get_chain_id(); - // hm.node_id = fc::sha256; - hm.last_irreversible_block_num = 0; - // block_id_type last_irreversible_block_id; - // string os; - // string agent; + void init_handshake () { + if (!hello) { + hello = handshake_message(); + } + + hello->network_version = 0; + // hello->chain_id = chain->get_chain_id(); + fc::rand_pseudo_bytes(hello->node_id.data(), hello->node_id.data_size()); + } + + void update_handshake () { + chain_plugin* cp = app().find_plugin(); + + hello->last_irreversible_block_id = cp->chain().get_block_id_for_num + (hello->last_irreversible_block_num = cp->chain().last_irreversible_block_num()); } void start_session( connection* con ) { connections.insert( con ); start_read_message( *con ); - handshake_message hello; - init_handshake (hello); - con->send( hello ); + if (hello.valid()) { + update_handshake (); + } else { + init_handshake (); + } + + con->send( *hello ); // con->readloop_complete = bf::async( [=](){ read_loop( con ); } ); // con->writeloop_complete = bf::async( [=](){ write_loop( con ); } ); @@ -250,7 +260,7 @@ class net_plugin_impl { auto socket = std::make_shared( std::ref( app().get_io_service() ) ); acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) { if( !ec ) { - start_session( new connection( socket, node_id ) ); + start_session( new connection( socket ) ); start_listen_loop(); } else { elog( "Error accepting connection: ${m}", ("m", ec.message() ) ); @@ -278,8 +288,33 @@ class net_plugin_impl { ); } + void handle_message (connection &c, handshake_message &msg) { ilog ("got a handshake message"); + if (!hello) { + init_handshake(); + } + if (msg.node_id == hello->node_id) + { + dlog ("Self connection detected. Closing connection"); + close(&c); + return; + } + if (msg.chain_id != hello->chain_id) + { + dlog ("Peer on a different chain. Closing connection"); + close (&c); + return; + } + if (msg.network_version != hello->network_version) + { + dlog ("Peer network id does not match "); + close (&c); + return; + } + + + c.last_handshake = msg; } @@ -317,6 +352,7 @@ class net_plugin_impl { auto msg = fc::raw::unpack( c.pending_message_buffer ); ilog( "received message of size: ${s}", ("s",bytes_transferred) ); start_read_message( c ); + switch (msg.which()) { case 0: { handle_message (c, msg.get ()); @@ -379,6 +415,7 @@ class net_plugin_impl { }; // class net_plugin_impl + net_plugin::net_plugin() :my( new net_plugin_impl ) { } @@ -409,11 +446,9 @@ void net_plugin::plugin_initialize( const variables_map& options ) { if( options.count( "remote-endpoint" ) ) { my->seed_nodes = options.at( "remote-endpoint" ).as< vector >(); } - - // fc::rand_pseudo_bytes(&my->node_id.data[0], (int)my->node_id.size()); } -void net_plugin::plugin_startup() { + void net_plugin::plugin_startup() { // boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), port); if( my->acceptor ) { my->acceptor->open(my->listen_endpoint.protocol()); From 61cd39f3022404f3ae86fd78044f0a0b9ecc1d2e Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Thu, 22 Jun 2017 04:10:25 -0500 Subject: [PATCH 4/7] start message handling --- plugins/net_plugin/CMakeLists.txt | 2 +- plugins/net_plugin/net_plugin.cpp | 237 ++++++++++++++++++++---------- 2 files changed, 160 insertions(+), 79 deletions(-) diff --git a/plugins/net_plugin/CMakeLists.txt b/plugins/net_plugin/CMakeLists.txt index 80515e5715a..4880a68ced3 100644 --- a/plugins/net_plugin/CMakeLists.txt +++ b/plugins/net_plugin/CMakeLists.txt @@ -3,7 +3,7 @@ add_library( net_plugin net_plugin.cpp ${HEADERS} ) -target_link_libraries( net_plugin chain_plugin appbase fc ) +target_link_libraries( net_plugin chain_plugin egenesis appbase fc ) target_include_directories( net_plugin PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" ) install( TARGETS diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index c1bf5a0aae0..9c401f6eb38 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -27,7 +27,7 @@ struct node_transaction_state { fc::time_point received; fc::time_point_sec expires; vector packed_transaction; - uint64_t block_num = -1; /// block transaction was included in + uint32_t block_num = -1; /// block transaction was included in bool validated = false; /// whether or not our node has validated it }; @@ -43,7 +43,7 @@ struct transaction_state { transaction_id_type id; bool is_known_by_peer = false; ///< true if we sent or received this trx to this peer or received notice from peer bool is_noticed_to_peer = false; ///< have we sent peer noitce we know it (true if we reeive from this peer) - uint64_t block_num = -1; ///< the block number the transaction was included in + uint32_t block_num = -1; ///< the block number the transaction was included in time_point validated_time; ///< infinity for unvalidated time_point requested_time; /// incase we fetch large trx }; @@ -76,9 +76,9 @@ typedef multi_index_container< * Index by start_block */ struct sync_state { - uint64_t start_block = 0; - uint64_t end_block = 0; - uint64_t last = 0; ///< last sent or received + uint32_t start_block = 0; + uint32_t end_block = 0; + uint32_t last = 0; ///< last sent or received time_point start_time;; ///< time request made or received }; @@ -86,7 +86,7 @@ struct by_start_block; typedef multi_index_container< sync_state, indexed_by< - ordered_unique< tag, member > + ordered_unique< tag, member > > > sync_request_index; @@ -98,6 +98,7 @@ class connection { wlog( "created connection" ); pending_message_buffer.resize( 1024*1024*4 ); } + ~connection() { wlog( "released connection" ); } @@ -112,8 +113,7 @@ class connection { vector pending_message_buffer; handshake_message last_handshake; - - std::deque out_queue; + std::deque out_queue; void send( const net_message& m ) { out_queue.push_back( m ); @@ -122,8 +122,12 @@ class connection { } void send_next_message() { - if( !out_queue.size() ) - return; + if( !out_queue.size() ) { + if (out_sync_state.size() > 0) { + write_block_backlog(); + } + return; + } auto& m = out_queue.front(); @@ -145,6 +149,29 @@ class connection { } }); } + + void write_block_backlog ( ) { + try { + ilog ("write loop sending backlog "); + if (out_sync_state.size() > 0) { + chain_controller& cc = app().find_plugin()->chain(); + auto ss = out_sync_state.begin(); + for (uint32_t num = ss->last + 1; + num <= ss->end_block; num++) { + fc::optional sb = cc.fetch_block_by_number(num); + if (sb) { + send( *sb ); + } + ss.get_node()->value().last = num; + } + out_sync_state.erase(0); + } + } catch ( ... ) { + wlog( "write loop exception" ); + } + } + + }; // class connection @@ -162,7 +189,10 @@ class net_plugin_impl { std::set< connection* > connections; bool done = false; - fc::optional hello; + fc::optional hello; + std::string user_agent_name; + chain_plugin* chain_plug; + void connect( const string& ep ) { auto host = ep.substr( 0, ep.find(':') ); @@ -229,15 +259,24 @@ class net_plugin_impl { } hello->network_version = 0; - // hello->chain_id = chain->get_chain_id(); + chain_plug->get_chain_id(hello->chain_id); fc::rand_pseudo_bytes(hello->node_id.data(), hello->node_id.data_size()); +#if defined( __APPLE__ ) + hello->os = "osx"; +#elif defined( __linux__ ) + hello->os = "linux"; +#elif defined( _MSC_VER ) + hello->os = "win32"; +#else + hello->os = "other"; +#endif + hello->agent = user_agent_name; + } void update_handshake () { - chain_plugin* cp = app().find_plugin(); - - hello->last_irreversible_block_id = cp->chain().get_block_id_for_num - (hello->last_irreversible_block_num = cp->chain().last_irreversible_block_num()); + hello->last_irreversible_block_id = chain_plug->chain().get_block_id_for_num + (hello->last_irreversible_block_num = chain_plug->chain().last_irreversible_block_num()); } void start_session( connection* con ) { @@ -252,8 +291,8 @@ class net_plugin_impl { con->send( *hello ); - // con->readloop_complete = bf::async( [=](){ read_loop( con ); } ); - // con->writeloop_complete = bf::async( [=](){ write_loop( con ); } ); + // con->readloop_complete = bf::async( [=](){ read_loop( con ); } ); + // con->writeloop_complete = bf::async( [=](){ write_loop con ); } ); } void start_listen_loop() { @@ -261,7 +300,7 @@ class net_plugin_impl { acceptor->async_accept( *socket, [socket,this]( boost::system::error_code ec ) { if( !ec ) { start_session( new connection( socket ) ); - start_listen_loop(); + start_listen_loop(); } else { elog( "Error accepting connection: ${m}", ("m", ec.message() ) ); } @@ -288,32 +327,47 @@ class net_plugin_impl { ); } - void handle_message (connection &c, handshake_message &msg) { - ilog ("got a handshake message"); if (!hello) { init_handshake(); } - if (msg.node_id == hello->node_id) - { - dlog ("Self connection detected. Closing connection"); + ilog ("got a handshake message"); + if (msg.node_id == hello->node_id) { + elog ("Self connection detected. Closing connection"); close(&c); return; } - if (msg.chain_id != hello->chain_id) - { - dlog ("Peer on a different chain. Closing connection"); - close (&c); - return; - } - if (msg.network_version != hello->network_version) - { - dlog ("Peer network id does not match "); - close (&c); - return; + if (msg.chain_id != hello->chain_id) { + elog ("Peer on a different chain. Closing connection"); + close (&c); + return; + } + if (msg.network_version != hello->network_version) { + elog ("Peer network id does not match "); + close (&c); + return; + } + chain_controller& cc = chain_plug->chain(); + uint32_t head = cc.head_block_num (); + if ( msg.last_irreversible_block_num > head) { + uint32_t delta = msg.last_irreversible_block_num - head; + uint32_t count = connections.size(); + uint32_t span = delta / count; + uint32_t lastSpan = delta - (span * (count-1)); + ilog ("peer is ahead of head by ${d}, count = ${c}, span = ${s}, lastspan = ${ls} ", + ("d",delta)("c",count)("s",span)("ls",lastSpan)); + for (auto &cx: connections) { + if (--count == 0) { + span = lastSpan; + } + sync_state req = {head+1, head+span, 0, time_point::now() }; + cx->in_sync_state.insert (req); + sync_request_message srm = {req.start_block, req.end_block }; + cx->send (srm); + head += span; } - + } c.last_handshake = msg; } @@ -328,7 +382,10 @@ class net_plugin_impl { } void handle_message (connection &c, sync_request_message &msg) { - ilog ("got a sync request message"); + ilog ("got a sync request message for blocks ${s} to ${e}", ("s",msg.start_block)("e", msg.end_block)); + sync_state req = {msg.start_block,msg.end_block,0,time_point::now()}; + c.out_sync_state.insert (req); + c.write_block_backlog (); } void handle_message (connection &c, block_summary_message &msg) { @@ -340,9 +397,68 @@ class net_plugin_impl { } void handle_message (connection &c, signed_block &msg) { - ilog ("got a signed_block"); + uint32_t bn = msg.block_num(); + ilog ("got a signed_block, num = ${n}", ("n", bn)); + chain_controller &cc = chain_plug->chain(); + + if (cc.is_known_block(msg.id())) { + ilog ("block id ${id} is known", ("id", msg.id()) ); + return; + } + uint32_t num = msg.block_num(); + for (auto &ss: c.in_sync_state) { + if (num >= ss.end_block) { + continue; + } + const_cast(ss).last = num; + break; + } + // TODO: add block to global state } + + struct msgHandler : public fc::visitor { + net_plugin_impl &impl; + connection &c; + msgHandler (net_plugin_impl &imp, connection &conn) : impl(imp), c(conn) {} + + void operator()(handshake_message &msg) + { + impl.handle_message (c, msg); + } + + void operator()(peer_message &msg) + { + impl.handle_message (c, msg); + } + + void operator()(notice_message &msg) + { + impl.handle_message (c, msg); + } + + void operator()(sync_request_message &msg) + { + impl.handle_message (c, msg); + } + + void operator()(block_summary_message &msg) + { + impl.handle_message (c, msg); + } + + void operator()(SignedTransaction &msg) + { + impl.handle_message (c, msg); + } + + void operator()(signed_block &msg) + { + impl.handle_message (c, msg); + } + }; + + void start_reading_pending_buffer( connection& c ) { boost::asio::async_read( *c.socket, boost::asio::buffer(c.pending_message_buffer.data(), c.pending_message_size ), [&]( boost::system::error_code ec, std::size_t bytes_transferred ) { @@ -353,37 +469,8 @@ class net_plugin_impl { ilog( "received message of size: ${s}", ("s",bytes_transferred) ); start_read_message( c ); - switch (msg.which()) { - case 0: { - handle_message (c, msg.get ()); - break; - } - case 1: { - handle_message (c, msg.get ()); - break; - } - case 2: { - break; - handle_message (c, msg.get ()); - break; - } - case 3: { - handle_message (c, msg.get ()); - break; - } - case 4: { - handle_message (c, msg.get()); - break; - } - case 5: { - handle_message (c, msg.get()); - break; - } - case 6: { - handle_message (c, msg.get()); - break; - } - } + msgHandler m(*this, c); + msg.visit(m); return; } catch ( const fc::exception& e ) { edump((e.to_detail_string() )); @@ -397,14 +484,6 @@ class net_plugin_impl { } - void write_loop( connection* c ) { - try { - c->send( handshake_message{} ); - } catch ( ... ) { - wlog( "write loop exception" ); - } - } - void close( connection* c ) { ilog( "close ${c}", ("c",int64_t(c))); if( c->socket ) @@ -415,7 +494,6 @@ class net_plugin_impl { }; // class net_plugin_impl - net_plugin::net_plugin() :my( new net_plugin_impl ) { } @@ -446,6 +524,9 @@ void net_plugin::plugin_initialize( const variables_map& options ) { if( options.count( "remote-endpoint" ) ) { my->seed_nodes = options.at( "remote-endpoint" ).as< vector >(); } + + my->user_agent_name = "EOS Test Agent"; + my->chain_plug = app().find_plugin(); } void net_plugin::plugin_startup() { From 814ad9bfb031886a4a80c09afb288d3abcfd810f Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Thu, 22 Jun 2017 04:54:48 -0500 Subject: [PATCH 5/7] fix link line --- plugins/net_plugin/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/net_plugin/CMakeLists.txt b/plugins/net_plugin/CMakeLists.txt index 4880a68ced3..80515e5715a 100644 --- a/plugins/net_plugin/CMakeLists.txt +++ b/plugins/net_plugin/CMakeLists.txt @@ -3,7 +3,7 @@ add_library( net_plugin net_plugin.cpp ${HEADERS} ) -target_link_libraries( net_plugin chain_plugin egenesis appbase fc ) +target_link_libraries( net_plugin chain_plugin appbase fc ) target_include_directories( net_plugin PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" ) install( TARGETS From 385b5a2418d521d83989cca9e6418aea22d1bb82 Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Fri, 23 Jun 2017 03:48:11 -0500 Subject: [PATCH 6/7] removed emacs temporary file --- .../include/eos/net_plugin/#net_plugin.hpp# | 25 ------------------- 1 file changed, 25 deletions(-) delete mode 100644 plugins/net_plugin/include/eos/net_plugin/#net_plugin.hpp# diff --git a/plugins/net_plugin/include/eos/net_plugin/#net_plugin.hpp# b/plugins/net_plugin/include/eos/net_plugin/#net_plugin.hpp# deleted file mode 100644 index dc86ba56491..00000000000 --- a/plugins/net_plugin/include/eos/net_plugin/#net_plugin.hpp# +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once -#include -#include - -namespace eos { - using namespace appbase; - - class net_plugin : public appbase::plugin - { - public: - net_plugin(); - virtual ~net_plugin(); - - APPBASE_PLUGIN_REQUIRES((chain_plugin)) - virtual void set_program_options(options_description& cli, options_description& cfg) override; - - void plugin_initialize(const variables_map& options); - void plugin_startup(); - void plugin_shutdown(); - - private: - std::unique_ptr myeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee; - }; - -} From d8fc310fe29a32a51cf331dbe387b7f17e6e6f3c Mon Sep 17 00:00:00 2001 From: Phil Mesnier Date: Sun, 25 Jun 2017 17:11:47 -0500 Subject: [PATCH 7/7] Send blocks to a peer that starts up in the past. --- plugins/chain_plugin/chain_plugin.cpp | 7 +++++++ .../chain_plugin/include/eos/chain_plugin/chain_plugin.hpp | 4 +++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/plugins/chain_plugin/chain_plugin.cpp b/plugins/chain_plugin/chain_plugin.cpp index 5f40cd8659d..201896c2fd9 100644 --- a/plugins/chain_plugin/chain_plugin.cpp +++ b/plugins/chain_plugin/chain_plugin.cpp @@ -20,6 +20,7 @@ using chain::fork_database; using chain::block_log; using chain::type_index; using chain::by_scope_name; +using chain::chain_id_type; class chain_plugin_impl { public: @@ -31,6 +32,7 @@ class chain_plugin_impl { fc::optional fork_db; fc::optional block_logger; fc::optional chain; + chain_id_type chain_id; }; @@ -100,6 +102,7 @@ void chain_plugin::plugin_startup() { my->fork_db = fork_database(); my->block_logger = block_log(my->block_log_dir); + my->chain_id = genesis.compute_chain_id(); my->chain = chain_controller(db, *my->fork_db, *my->block_logger, initializer, native_contract::make_administrator()); @@ -140,6 +143,10 @@ bool chain_plugin::block_is_on_preferred_chain(const chain::block_id_type& block chain_controller& chain_plugin::chain() { return *my->chain; } const chain::chain_controller& chain_plugin::chain() const { return *my->chain; } + void chain_plugin::get_chain_id (chain_id_type &cid)const { + memcpy (cid.data(), my->chain_id.data(), cid.data_size()); + } + namespace chain_apis { read_only::get_info_results read_only::get_info(const read_only::get_info_params&) const { diff --git a/plugins/chain_plugin/include/eos/chain_plugin/chain_plugin.hpp b/plugins/chain_plugin/include/eos/chain_plugin/chain_plugin.hpp index c9edb532a29..0352f7ce8ec 100644 --- a/plugins/chain_plugin/include/eos/chain_plugin/chain_plugin.hpp +++ b/plugins/chain_plugin/include/eos/chain_plugin/chain_plugin.hpp @@ -86,6 +86,8 @@ class chain_plugin : public plugin { // Only call this after plugin_startup()! const chain_controller& chain() const; + void get_chain_id (chain::chain_id_type &cid) const; + private: unique_ptr my; }; @@ -97,4 +99,4 @@ FC_REFLECT(eos::chain_apis::read_only::get_info_results, (head_block_num)(head_block_id)(head_block_time)(head_block_producer) (recent_slots)(participation_rate)) FC_REFLECT(eos::chain_apis::read_only::get_block_params, (block_num_or_id)) -FC_REFLECT(eos::chain_apis::read_only::get_types_params, (account_name)) \ No newline at end of file +FC_REFLECT(eos::chain_apis::read_only::get_types_params, (account_name))