diff --git a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp index 31ed9d1912f..110bcbdf0cb 100644 --- a/plugins/net_plugin/include/eos/net_plugin/protocol.hpp +++ b/plugins/net_plugin/include/eos/net_plugin/protocol.hpp @@ -16,7 +16,7 @@ namespace eos { block_id_type head_id; string os; string agent; - }; + }; struct notice_message { vector known_trx; diff --git a/plugins/net_plugin/net_plugin.cpp b/plugins/net_plugin/net_plugin.cpp index 63e41f7da7e..a6cfb58626c 100644 --- a/plugins/net_plugin/net_plugin.cpp +++ b/plugins/net_plugin/net_plugin.cpp @@ -96,6 +96,12 @@ namespace eos { > > sync_request_index; + + struct handshake_initializer { + static void populate (handshake_message &hello); + static net_plugin_impl* info; + }; + class connection : public std::enable_shared_from_this { public: connection( socket_ptr s ) @@ -120,33 +126,14 @@ namespace eos { uint32_t pending_message_size; vector pending_message_buffer; - fc::optional hello; handshake_message last_handshake; std::deque out_queue; connection_ptr self; void send_handshake ( ) { - chain_controller& cc = app().find_plugin()->chain(); - try { - hello->last_irreversible_block_id = cc.get_block_id_for_num - (hello->last_irreversible_block_num = cc.last_irreversible_block_num()); - } - catch (const unknown_block_exception &ex) { - hello->last_irreversible_block_id = fc::sha256::hash(0); - hello->last_irreversible_block_num = 0; - } - try { - hello->head_id = cc.get_block_id_for_num - (hello->head_num = cc.head_block_num()); - } - catch (const unknown_block_exception &ex) { - hello->head_id = fc::sha256::hash(0); - hello->head_num = 0; - } - ilog ("send_handshake my libnum = ${n} head = ${h}", - ("n",hello->last_irreversible_block_num)("h",hello->head_num)); - - send (*hello); + handshake_message hello; + handshake_initializer::populate(hello); + send (hello); } void send( const net_message& m ) { @@ -190,7 +177,7 @@ namespace eos { }); } - void write_block_backlog ( ) { + void write_block_backlog ( ) { chain_controller& cc = app().find_plugin()->chain(); auto ss = out_sync_state.begin(); uint32_t num = ++ss.get_node()->value().last; @@ -230,7 +217,10 @@ namespace eos { std::set< connection_ptr > connections; bool done = false; - fc::optional hello; + int16_t network_version = 0; + chain_id_type chain_id; ///< used to identify chain + fc::sha256 node_id; ///< used to identify peers and prevent self-connect + std::string user_agent_name; chain_plugin* chain_plug; vector local_txns; @@ -311,36 +301,13 @@ namespace eos { } FC_CAPTURE_AND_RETHROW() } #endif - void init_handshake () { - if (!hello) { - hello = handshake_message(); - } - - hello->network_version = 0; - chain_plug->get_chain_id(hello->chain_id); - fc::rand_pseudo_bytes(hello->node_id.data(), hello->node_id.data_size()); -#if defined( __APPLE__ ) - hello->os = "osx"; -#elif defined( __linux__ ) - hello->os = "linux"; -#elif defined( _MSC_VER ) - hello->os = "win32"; -#else - hello->os = "other"; -#endif - hello->agent = user_agent_name; - } void start_session( connection_ptr con ) { connections.insert (con); start_read_message( con ); - if (!hello.valid()) { - init_handshake (); - } - - con->hello = hello; - con->send_handshake( ); + con->send_handshake(); + send_peer_message(*con); // con->readloop_complete = bf::async( [=](){ read_loop( con ); } ); // con->writeloop_complete = bf::async( [=](){ write_loop con ); } ); @@ -366,7 +333,7 @@ namespace eos { boost::asio::async_read( *c->socket, boost::asio::buffer((char *)buff, sizeof(c->pending_message_size)), [this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) { - ilog( "read size handler..." ); + //ilog( "read size handler..." ); if( !ec ) { if( c->pending_message_size <= c->pending_message_buffer.size() ) { start_reading_pending_buffer( c ); @@ -393,7 +360,7 @@ namespace eos { return fc::ip::endpoint (addr,ep.port()); } - void send_peer_list_to (connection &conn) { + void send_peer_message (connection &conn) { peer_message pm; pm.peers.resize(connections.size()); for (auto &c : connections) { @@ -407,7 +374,6 @@ namespace eos { } } - template void send_all (const T &msg) { for (auto &c : connections) { @@ -437,21 +403,19 @@ namespace eos { } void handle_message (connection_ptr c, const handshake_message &msg) { - if (!hello) { - init_handshake(); - } - // dlog ("got a handshake message"); - if (msg.node_id == hello->node_id) { + + dlog ("got a handshake message"); + if (msg.node_id == node_id) { elog ("Self connection detected. Closing connection"); close(c); return; } - if (msg.chain_id != hello->chain_id) { + if (msg.chain_id != chain_id) { elog ("Peer on a different chain. Closing connection"); close (c); return; } - if (msg.network_version != hello->network_version) { + if (msg.network_version != network_version) { elog ("Peer network id does not match "); close (c); return; @@ -466,7 +430,7 @@ namespace eos { } void handle_message (connection_ptr c, const peer_message &msg) { - // dlog ("got a peer message"); + dlog ("got a peer message"); for (auto fcep : msg.peers) { c->shared_peers.insert (fcep); tcp::endpoint ep = fc_to_asio (fcep); @@ -489,7 +453,7 @@ namespace eos { } void handle_message (connection_ptr c, const notice_message &msg) { - // dlog ("got a notice message"); + dlog ("got a notice message"); chain_controller &cc = chain_plug->chain(); for (const auto& b : msg.known_blocks) { if (! cc.is_known_block (b)) { @@ -505,15 +469,15 @@ namespace eos { } void handle_message (connection_ptr c, const sync_request_message &msg) { - // og ("got a sync request message for blocks ${s} to ${e}", - // ("s",msg.start_block)("e", msg.end_block)); + dlog ("got a sync request message for blocks ${s} to ${e}", + ("s",msg.start_block)("e", msg.end_block)); sync_state req = {msg.start_block,msg.end_block,msg.start_block-1,time_point::now()}; c->out_sync_state.insert (req); c->write_block_backlog (); } void handle_message (connection_ptr c, const block_summary_message &msg) { - // dlog ("got a block summary message"); + dlog ("got a block summary message"); #warning TODO: reconstruct actual block from cached transactions chain_controller &cc = chain_plug->chain(); if (cc.is_known_block(msg.block.id())) { @@ -537,13 +501,13 @@ namespace eos { } void handle_message (connection_ptr c, const SignedTransaction &msg) { - ilog ("got a SignedTransacton"); + dlog ("got a SignedTransacton"); chain_plug->accept_transaction (msg); } void handle_message (connection_ptr c, const signed_block &msg) { uint32_t bn = msg.block_num(); - ilog ("got a signed_block, num = ${n}", ("n", bn)); + dlog ("got a signed_block, num = ${n}", ("n", bn)); chain_controller &cc = chain_plug->chain(); if (cc.is_known_block(msg.id())) { @@ -594,11 +558,11 @@ namespace eos { boost::asio::buffer(c->pending_message_buffer.data(), c->pending_message_size ), [this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) { - ilog( "read buffer handler..." ); + // ilog( "read buffer handler..." ); if( !ec ) { try { auto msg = fc::raw::unpack( c->pending_message_buffer ); - ilog( "received message of size: ${s}", ("s",bytes_transferred) ); + // ilog( "received message of size: ${s}", ("s",bytes_transferred) ); start_read_message( c ); msgHandler m(*this, c); @@ -628,8 +592,49 @@ namespace eos { }; // class net_plugin_impl + net_plugin_impl* handshake_initializer::info; + + void + handshake_initializer::populate (handshake_message &hello) { + hello.network_version = 0; + hello.chain_id = info->chain_id; + hello.node_id = info->node_id; +#if defined( __APPLE__ ) + hello.os = "osx"; +#elif defined( __linux__ ) + hello.os = "linux"; +#elif defined( _MSC_VER ) + hello.os = "win32"; +#else + hello.os = "other"; +#endif + hello.agent = info->user_agent_name; + + + chain_controller& cc = info->chain_plug->chain(); + try { + hello.last_irreversible_block_id = cc.get_block_id_for_num + (hello.last_irreversible_block_num = cc.last_irreversible_block_num()); + } + catch (const unknown_block_exception &ex) { + hello.last_irreversible_block_id = fc::sha256::hash(0); + hello.last_irreversible_block_num = 0; + } + try { + hello.head_id = cc.get_block_id_for_num + (hello.head_num = cc.head_block_num()); + } + catch (const unknown_block_exception &ex) { + hello.head_id = fc::sha256::hash(0); + hello.head_num = 0; + } + + } + + net_plugin::net_plugin() :my( new net_plugin_impl ) { + handshake_initializer::info = my.get(); } net_plugin::~net_plugin() { @@ -667,6 +672,9 @@ namespace eos { my->user_agent_name = options.at ("agent-name").as< string > (); } my->chain_plug = app().find_plugin(); + my->chain_plug->get_chain_id(my->chain_id); + fc::rand_pseudo_bytes(my->node_id.data(), my->node_id.data_size()); + } void net_plugin::plugin_startup() { diff --git a/tests/p2p_tests/init/run_test.pl b/tests/p2p_tests/init/run_test.pl new file mode 100755 index 00000000000..855855aa591 --- /dev/null +++ b/tests/p2p_tests/init/run_test.pl @@ -0,0 +1,132 @@ +#!/usr/bin/perl + +use strict; +use Getopt::Long; +use Env; +use File::Basename; +use File::copy; +use File::Spec; +use File::Path; +use Cwd; + +my $eos_home = defined $ENV{EOS_HOME} ? $ENV{EOS_HOME} : getcwd; +my $eosd = $eos_home . "/programs/eosd/eosd"; + +my $nodes = defined $ENV{EOS_TEST_RING} ? $ENV{EOS_TEST_RING} : "1"; +my $only_one = defined $ENV{EOS_TEST_ONE_PRODUCER} ? "1" : ""; + +my $prods = 21; +my $genesis = "$eos_home/genesis.json"; +my $http_port_base = 8888; +my $p2p_port_base = 9876; +my $data_dir_base = './test-dir-node'; +my $http_port_base = 8888; + +my $first_pause = 45; +my $launch_pause = 5; +my $run_duration = 60; + +if (!GetOptions("nodes=i" => \$nodes, + "first-pause=i" => \$first_pause, + "launch-pause=i" => \$launch_pause, + "duration=i" => \$run_duratiion, + "one-producer" => \$only_one)) { + print "usage: $ARGV[0] [--nodes=] [--first-pause=] [--launch-pause=] [--duration=] [--one-producer]\n"; + print "where:\n"; + print "--nodes=n (default = 1) sets the number of eosd instances to launch\n"; + print "--first-pause=n (default = 45) sets the seconds delay after starting the first instance\n"; + print "--launch-pause=n (default = 5) sets the seconds delay after starting subsequent nodes\n"; + print "--duration=n (default = 60) sets the seconds delay after starting the last node before shutting down the test\n"; + print "--one-producer (default = no) if set concentrates all producers into the first node\n"; + print "\nproducer count currently fixed at $prods\n"; + exit +} + +my $per_node = int ($prods / ($only_one ? 1 : $nodes)); +my $extra = $prods - ($per_node * $nodes); +my @pid; +my $prod_ndx = ord('a'); +for (my $i = 0; $i < $nodes; $i++) { + my $p2p_port = $p2p_port_base + $i; + my $forward = $p2p_port + 1; + my $backward = $p2p_port - 1; + my $http_port = $http_port_base + $i; + my $data_dir = "$data_dir_base-$i"; + if ($nodes > 1) { + if ($i == 0) { + $backward = $p2p_port_base + $nodes -1; + } + elsif ($i == $nodes - 1) { + $forward = $p2p_port_base; + } + } + print "purging directory $data_dir\n"; + rmtree ($data_dir); + mkdir ($data_dir); + + open (my $cfg, '>', "$data_dir/config.ini") ; + print $cfg "genesis-json = \"$genesis\"\n"; + print $cfg "block-log-dir = \"blocks\"\n"; + print $cfg "readonly = 0\n"; + print $cfg "shared-file-dir = \"blockchain\"\n"; + print $cfg "shared-file-size = 64\n"; + print $cfg "http-server-endpoint = 127.0.0.1:$http_port\n"; + print $cfg "listen-endpoint = 127.0.0.1:$p2p_port\n"; + print $cfg "remote-endpoint = 127.0.0.1:$forward\n" if ($nodes > 1); + print $cfg "remote-endpoint = 127.0.0.1:$backward\n" if ($nodes > 2); + print $cfg "public-endpoint = 0.0.0.0:$p2p_port\n"; + print $cfg "enable-stale-production = true\n"; + print $cfg "required-participation = true\n"; + print $cfg "private-key = [\"EOS6MRyAjQq8ud7hVNYcfnVPJqcVpscN5So8BhtHuGYqET5GDW5CV\",\"5KQwrPbwdL6PhXujxW37FSSQZ1JiwsST4cqQzDeyXtP79zkvFD3\"]\n"; + + if ($i == 0 || $only_one != "1") { + print $cfg "plugin = eos::producer_plugin\n"; + $per_node += $extra if ($i == $nodes - 1); + for (my $p = 0; $p < $per_node; $p++) { + my $pname = "init" . chr($prod_ndx++); + print $cfg "producer-name = $pname\n"; + } + } + close $cfg; + + my @cmdline = ($eosd, +# "--genesis-timestamp=now", + "--data-dir=$data_dir"); + + $pid[$i] = fork; + if ($pid[$i] > 0) { + my $pause = $i == 0 ? $first_pause : $launch_pause; + print "parent process looping, child pid = $pid[$i]"; + if ($i < $nodes - 1) { + print ", pausing $pause seconds\n"; + sleep ($pause); + } + else { + print "\n"; + } + + } + elsif (defined ($pid[$i])) { + print "child execing now, pid = $$\n"; + open OUTPUT, '>', "$data_dir/stdout.txt" or die $!; + open ERROR, '>', "$data_dir/stderr.txt" or die $!; + STDOUT->fdopen ( \*OUTPUT, 'w') or die $!; + STDERR->fdopen ( \*ERROR, 'w') or die $!; + + exec @cmdline; + print "child terminating now\n"; + exit; + } + else { + print "fork failed\n"; + exit; + } +} + +print "all nodes launched, network running for $run_duration seconds\n"; +sleep ($run_duration); +foreach my $pp (@pid) { + print "killing $pp\n"; + my $res = kill 2, $pp; + print "kill returned $res" +}