Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

another round of incremental improvements for the p2p node networks. #84

Merged
merged 1 commit into from
Jul 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions plugins/net_plugin/include/eos/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace eos {
int16_t network_version = 0;
chain_id_type chain_id; ///< used to identify chain
fc::sha256 node_id; ///< used to identify peers and prevent self-connect
string p2p_address;
uint32_t last_irreversible_block_num = 0;
block_id_type last_irreversible_block_id;
uint32_t head_num = 0;
Expand Down Expand Up @@ -50,6 +51,7 @@ namespace eos {

FC_REFLECT( eos::handshake_message,
(network_version)(chain_id)(node_id)
(p2p_address)
(last_irreversible_block_num)(last_irreversible_block_id)
(head_num)(head_id)
(os)(agent) )
Expand Down
46 changes: 34 additions & 12 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@
#include <fc/container/flat.hpp>
#include <fc/reflect/variant.hpp>
#include <fc/crypto/rand.hpp>
#include <fc/exception/exception.hpp>

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/host_name.hpp>

namespace eos {
using std::vector;
using boost::asio::ip::tcp;
using boost::asio::ip::address_v4;
using boost::asio::ip::host_name;
using fc::time_point;
using fc::time_point_sec;
using eos::chain::transaction_id_type;
Expand Down Expand Up @@ -209,7 +213,7 @@ namespace eos {
unique_ptr<tcp::acceptor> acceptor;

tcp::endpoint listen_endpoint;
tcp::endpoint public_endpoint;
string p2p_address;

vector<string> seed_nodes;
std::set<tcp::endpoint> resolved_seed_nodes;
Expand Down Expand Up @@ -355,7 +359,7 @@ namespace eos {


tcp::endpoint fc_to_asio (const fc::ip::endpoint &fcep) {
boost::asio::ip::address_v4 addr((uint32_t)fcep.get_address());
address_v4 addr((uint32_t)fcep.get_address());
return tcp::endpoint(addr, fcep.port());
}

Expand Down Expand Up @@ -415,8 +419,7 @@ namespace eos {
}

void handle_message (connection_ptr c, const handshake_message &msg) {

dlog ("got a handshake message");
dlog ("got a handshake message from ${p}", ("p", msg.p2p_address));
if (msg.node_id == node_id) {
elog ("Self connection detected. Closing connection");
close(c);
Expand All @@ -442,11 +445,11 @@ namespace eos {
}

void handle_message (connection_ptr c, const peer_message &msg) {
dlog ("got a peer message");
dlog ("got a peer message with ${pc}", ("pc", msg.peers.size()));
for (auto fcep : msg.peers) {
c->shared_peers.insert (fcep);
tcp::endpoint ep = fc_to_asio (fcep);
if (ep == listen_endpoint || ep == public_endpoint) {
if (ep == listen_endpoint) {
continue;
}

Expand Down Expand Up @@ -608,6 +611,7 @@ namespace eos {
hello.network_version = 0;
hello.chain_id = info->chain_id;
hello.node_id = info->node_id;
hello.p2p_address = info->p2p_address;
#if defined( __APPLE__ )
hello.os = "osx";
#elif defined( __linux__ )
Expand Down Expand Up @@ -652,28 +656,46 @@ namespace eos {
void net_plugin::set_program_options( options_description& cli, options_description& cfg )
{
cfg.add_options()
("listen-endpoint", bpo::value<string>()->default_value( "127.0.0.1:9876" ), "The local IP address and port to listen for incoming connections.")
("listen-endpoint", bpo::value<string>()->default_value( "0.0.0.0:9876" ), "The local IP address and port to listen for incoming connections.")
("remote-endpoint", bpo::value< vector<string> >()->composing(), "The IP address and port of a remote peer to sync with.")
("public-endpoint", bpo::value<string>()->default_value( "0.0.0.0:9876" ), "The public IP address and port that should be advertized to peers.")
("public-endpoint", bpo::value<string>(), "Overrides the advertised listen endpointlisten ip address.")
("agent-name", bpo::value<string>()->default_value("EOS Test Agent"), "The name supplied to identify this node amongst the peers.")
;
}

void net_plugin::plugin_initialize( const variables_map& options ) {
ilog("Initialize net plugin");
auto resolver = std::make_shared<tcp::resolver>( std::ref( app().get_io_service() ) );
if( options.count( "listen-endpoint" ) ) {
auto lipstr = options.at("listen-endpoint").as< string >();
auto host = lipstr.substr( 0, lipstr.find(':') );
auto port = lipstr.substr( host.size()+1, host.size() );
my->p2p_address = options.at("listen-endpoint").as< string >();
auto host = my->p2p_address.substr( 0, my->p2p_address.find(':') );
auto port = my->p2p_address.substr( host.size()+1, my->p2p_address.size() );
idump((host)(port));
auto resolver = std::make_shared<tcp::resolver>( std::ref( app().get_io_service() ) );
tcp::resolver::query query( tcp::v4(), host.c_str(), port.c_str() );
// Note: need to add support for IPv6 too?

my->listen_endpoint = *resolver->resolve( query);

my->acceptor.reset( new tcp::acceptor( app().get_io_service() ) );
}
if (options.count ("public-endpoint") ) {
my->p2p_address = options.at("public-endpoint").as< string >();
}
else {
if (my->listen_endpoint.address().to_v4() == address_v4::any()) {
boost::system::error_code ec;
auto host = host_name(ec);
if (ec.value() != boost::system::errc::success) {

FC_THROW_EXCEPTION (fc::invalid_arg_exception,
"Unable to retrieve host_name. ${msg}", ("msg",ec.message()));

}
auto port = my->p2p_address.substr (my->p2p_address.find(':'), my->p2p_address.size());
my->p2p_address = host + port;
}
}

if( options.count( "remote-endpoint" ) ) {
my->seed_nodes = options.at( "remote-endpoint" ).as< vector<string> >();
}
Expand Down
176 changes: 121 additions & 55 deletions tests/p2p_tests/init/run_test.pl
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
my $p2p_port_base = 9876;
my $data_dir_base = './test-dir-node';
my $http_port_base = 8888;

my $hostname = "localhost";
my $first_pause = 45;
my $launch_pause = 5;
my $run_duration = 60;
my $topo = "ring";
my $override_gts = "now";

if (!GetOptions("nodes=i" => \$nodes,
"first-pause=i" => \$first_pause,
Expand All @@ -45,36 +47,39 @@
my $per_node = int ($prods / ($only_one ? 1 : $nodes));
my $extra = $prods - ($per_node * $nodes);
my @pid;
my @data_dir;
my $prod_ndx = ord('a');
my @p2p_port;
my @http_port;
my @peers;
my $rhost = $hostname; # from a list for multihost tests
for (my $i = 0; $i < $nodes; $i++) {
my $p2p_port = $p2p_port_base + $i;
my $forward = $p2p_port + 1;
my $backward = $p2p_port - 1;
my $http_port = $http_port_base + $i;
my $data_dir = "$data_dir_base-$i";
if ($nodes > 1) {
if ($i == 0) {
$backward = $p2p_port_base + $nodes -1;
}
elsif ($i == $nodes - 1) {
$forward = $p2p_port_base;
}
}
print "purging directory $data_dir\n";
rmtree ($data_dir);
mkdir ($data_dir);
$p2p_port[$i] = $p2p_port_base + $i;
$http_port[$i] = $http_port_base + $i;
$data_dir[$i] = "$data_dir_base-$i";
}

open (my $cfg, '>', "$data_dir/config.ini") ;

sub write_config {
my $i = shift;

print "purging directory $data_dir[$i]\n";
rmtree ($data_dir[$i]);
mkdir ($data_dir[$i]);

open (my $cfg, '>', "$data_dir[$i]/config.ini") ;
print $cfg "genesis-json = \"$genesis\"\n";
print $cfg "block-log-dir = \"blocks\"\n";
print $cfg "readonly = 0\n";
print $cfg "shared-file-dir = \"blockchain\"\n";
print $cfg "shared-file-size = 64\n";
print $cfg "http-server-endpoint = 127.0.0.1:$http_port\n";
print $cfg "listen-endpoint = 127.0.0.1:$p2p_port\n";
print $cfg "remote-endpoint = 127.0.0.1:$forward\n" if ($nodes > 1);
print $cfg "remote-endpoint = 127.0.0.1:$backward\n" if ($nodes > 2);
print $cfg "public-endpoint = 0.0.0.0:$p2p_port\n";
print $cfg "http-server-endpoint = $hostname:$http_port[$i]\n";
print $cfg "listen-endpoint = 0.0.0.0:$p2p_port[$i]\n";
print $cfg "public-endpoint = $hostname:$p2p_port[$i]\n";
foreach my $peer (@peers) {
print $cfg "remote-endpoint = $peer\n";
}

print $cfg "enable-stale-production = true\n";
print $cfg "required-participation = true\n";
print $cfg "private-key = [\"EOS6MRyAjQq8ud7hVNYcfnVPJqcVpscN5So8BhtHuGYqET5GDW5CV\",\"5KQwrPbwdL6PhXujxW37FSSQZ1JiwsST4cqQzDeyXtP79zkvFD3\"]\n";
Expand All @@ -88,44 +93,105 @@
}
}
close $cfg;
}

my @cmdline = ($eosd,
# "--genesis-timestamp=now",
"--data-dir=$data_dir");

$pid[$i] = fork;
if ($pid[$i] > 0) {
my $pause = $i == 0 ? $first_pause : $launch_pause;
print "parent process looping, child pid = $pid[$i]";
if ($i < $nodes - 1) {
print ", pausing $pause seconds\n";
sleep ($pause);
}
else {
print "\n";

sub make_ring_topology () {
for (my $i = 0; $i < $nodes; $i++) {
my $rport = ($i == $nodes - 1) ? $p2p_port_base : $p2p_port[$i] + 1;
$peers[0] = "$rhost:$rport";
if ($nodes > 2) {
$rport = $p2p_port[$i] - 1;
$rport += $nodes if ($i == 0);
$peers[1] = "$rhost:$rport";
}
write_config ($i);
}
return 1;
}

sub make_grid_topology () {
print "Sorry, the grid topology is not yet implemented\n";
return 0;
}

sub make_star_topology () {
print "Sorry, the star topology is not yet implemented\n";
return 0;
}

sub launch_nodes () {
my $GTS = $override_gts;
if ($override_gts =~ "now" ) {
chomp ($GTS = `date -u "+%Y-%m-%dT%H:%M:%S"`);
my @s = split (':',$GTS);
$s[2] = substr ((100 + (int ($s[2]/3) * 3)),1);
$GTS = join (':', @s);
print "using genesis time stamp $GTS\n";
}
elsif (defined ($pid[$i])) {
print "child execing now, pid = $$\n";
open OUTPUT, '>', "$data_dir/stdout.txt" or die $!;
open ERROR, '>', "$data_dir/stderr.txt" or die $!;
STDOUT->fdopen ( \*OUTPUT, 'w') or die $!;
STDERR->fdopen ( \*ERROR, 'w') or die $!;

exec @cmdline;
print "child terminating now\n";
exit;
my $gtsarg;
$gtsarg = "--genesis-timestamp=$GTS" if ($override_gts);

for (my $i = 0; $i < $nodes; $i++) {
my @cmdline = ($eosd,
$gtsarg,
"--data-dir=$data_dir[$i]");

$pid[$i] = fork;
if ($pid[$i] > 0) {
my $pause = $i == 0 ? $first_pause : $launch_pause;
print "parent process looping, child pid = $pid[$i]";
if ($i < $nodes - 1) {
print ", pausing $pause seconds\n";
sleep ($pause);
}
else {
print "\n";
}

}
elsif (defined ($pid[$i])) {
print "child execing now, pid = $$\n";
open OUTPUT, '>', "$data_dir[$i]/stdout.txt" or die $!;
open ERROR, '>', "$data_dir[$i]/stderr.txt" or die $!;
STDOUT->fdopen ( \*OUTPUT, 'w') or die $!;
STDERR->fdopen ( \*ERROR, 'w') or die $!;

exec @cmdline;
print "child terminating now\n";
exit;
}
else {
print "fork failed\n";
exit;
}
}
else {
print "fork failed\n";
exit;
}

sub kill_nodes () {
print "all nodes launched, network running for $run_duration seconds\n";
sleep ($run_duration);

foreach my $pp (@pid) {
print "killing $pp\n";
kill 2, $pp;
}
}

print "all nodes launched, network running for $run_duration seconds\n";
sleep ($run_duration);
foreach my $pp (@pid) {
print "killing $pp\n";
kill 2, $pp;

###################################################
# main

if ($nodes == 1) {
write_config (0);
}
else {
if ( $topo =~ "ring" ) { make_ring_topology () or die; }
elsif ( $topo =~ "grid" ) { make_grid_topology () or die; }
elsif ( $topo =~ "star" ) { make_star_topology () or die; }
else { print "$topo is not a known topology" and die; }
}

launch_nodes ();

kill_nodes () if ($run_duration > 0);