Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Adding a script to launch multiple eosd instances in order to test th…
Browse files Browse the repository at this point in the history
…e p2p networking.

not quite perfect yet.
  • Loading branch information
pmesnier committed Jul 14, 2017
1 parent 366219e commit d48b34c
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 69 deletions.
2 changes: 1 addition & 1 deletion plugins/net_plugin/include/eos/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace eos {
block_id_type head_id;
string os;
string agent;
};
};

struct notice_message {
vector<transaction_id_type> known_trx;
Expand Down
144 changes: 76 additions & 68 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ namespace eos {
>
> sync_request_index;


struct handshake_initializer {
static void populate (handshake_message &hello);
static net_plugin_impl* info;
};

class connection : public std::enable_shared_from_this<connection> {
public:
connection( socket_ptr s )
Expand All @@ -120,33 +126,14 @@ namespace eos {
uint32_t pending_message_size;
vector<char> pending_message_buffer;

fc::optional<handshake_message> hello;
handshake_message last_handshake;
std::deque<net_message> out_queue;
connection_ptr self;

void send_handshake ( ) {
chain_controller& cc = app().find_plugin<chain_plugin>()->chain();
try {
hello->last_irreversible_block_id = cc.get_block_id_for_num
(hello->last_irreversible_block_num = cc.last_irreversible_block_num());
}
catch (const unknown_block_exception &ex) {
hello->last_irreversible_block_id = fc::sha256::hash(0);
hello->last_irreversible_block_num = 0;
}
try {
hello->head_id = cc.get_block_id_for_num
(hello->head_num = cc.head_block_num());
}
catch (const unknown_block_exception &ex) {
hello->head_id = fc::sha256::hash(0);
hello->head_num = 0;
}
ilog ("send_handshake my libnum = ${n} head = ${h}",
("n",hello->last_irreversible_block_num)("h",hello->head_num));

send (*hello);
handshake_message hello;
handshake_initializer::populate(hello);
send (hello);
}

void send( const net_message& m ) {
Expand Down Expand Up @@ -190,7 +177,7 @@ namespace eos {
});
}

void write_block_backlog ( ) {
void write_block_backlog ( ) {
chain_controller& cc = app().find_plugin<chain_plugin>()->chain();
auto ss = out_sync_state.begin();
uint32_t num = ++ss.get_node()->value().last;
Expand Down Expand Up @@ -230,7 +217,10 @@ namespace eos {
std::set< connection_ptr > connections;
bool done = false;

fc::optional<handshake_message> hello;
int16_t network_version = 0;
chain_id_type chain_id; ///< used to identify chain
fc::sha256 node_id; ///< used to identify peers and prevent self-connect

std::string user_agent_name;
chain_plugin* chain_plug;
vector<node_transaction_state> local_txns;
Expand Down Expand Up @@ -311,36 +301,13 @@ namespace eos {
} FC_CAPTURE_AND_RETHROW() }
#endif

void init_handshake () {
if (!hello) {
hello = handshake_message();
}

hello->network_version = 0;
chain_plug->get_chain_id(hello->chain_id);
fc::rand_pseudo_bytes(hello->node_id.data(), hello->node_id.data_size());
#if defined( __APPLE__ )
hello->os = "osx";
#elif defined( __linux__ )
hello->os = "linux";
#elif defined( _MSC_VER )
hello->os = "win32";
#else
hello->os = "other";
#endif
hello->agent = user_agent_name;
}

void start_session( connection_ptr con ) {
connections.insert (con);
start_read_message( con );

if (!hello.valid()) {
init_handshake ();
}

con->hello = hello;
con->send_handshake( );
con->send_handshake();
send_peer_message(*con);

// con->readloop_complete = bf::async( [=](){ read_loop( con ); } );
// con->writeloop_complete = bf::async( [=](){ write_loop con ); } );
Expand All @@ -366,7 +333,7 @@ namespace eos {
boost::asio::async_read( *c->socket,
boost::asio::buffer((char *)buff, sizeof(c->pending_message_size)),
[this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) {
ilog( "read size handler..." );
//ilog( "read size handler..." );
if( !ec ) {
if( c->pending_message_size <= c->pending_message_buffer.size() ) {
start_reading_pending_buffer( c );
Expand All @@ -393,7 +360,7 @@ namespace eos {
return fc::ip::endpoint (addr,ep.port());
}

void send_peer_list_to (connection &conn) {
void send_peer_message (connection &conn) {
peer_message pm;
pm.peers.resize(connections.size());
for (auto &c : connections) {
Expand All @@ -407,7 +374,6 @@ namespace eos {
}
}


template<typename T>
void send_all (const T &msg) {
for (auto &c : connections) {
Expand Down Expand Up @@ -437,21 +403,19 @@ namespace eos {
}

void handle_message (connection_ptr c, const handshake_message &msg) {
if (!hello) {
init_handshake();
}
// dlog ("got a handshake message");
if (msg.node_id == hello->node_id) {

dlog ("got a handshake message");
if (msg.node_id == node_id) {
elog ("Self connection detected. Closing connection");
close(c);
return;
}
if (msg.chain_id != hello->chain_id) {
if (msg.chain_id != chain_id) {
elog ("Peer on a different chain. Closing connection");
close (c);
return;
}
if (msg.network_version != hello->network_version) {
if (msg.network_version != network_version) {
elog ("Peer network id does not match ");
close (c);
return;
Expand All @@ -466,7 +430,7 @@ namespace eos {
}

void handle_message (connection_ptr c, const peer_message &msg) {
// dlog ("got a peer message");
dlog ("got a peer message");
for (auto fcep : msg.peers) {
c->shared_peers.insert (fcep);
tcp::endpoint ep = fc_to_asio (fcep);
Expand All @@ -489,7 +453,7 @@ namespace eos {
}

void handle_message (connection_ptr c, const notice_message &msg) {
// dlog ("got a notice message");
dlog ("got a notice message");
chain_controller &cc = chain_plug->chain();
for (const auto& b : msg.known_blocks) {
if (! cc.is_known_block (b)) {
Expand All @@ -505,15 +469,15 @@ namespace eos {
}

void handle_message (connection_ptr c, const sync_request_message &msg) {
// og ("got a sync request message for blocks ${s} to ${e}",
// ("s",msg.start_block)("e", msg.end_block));
dlog ("got a sync request message for blocks ${s} to ${e}",
("s",msg.start_block)("e", msg.end_block));
sync_state req = {msg.start_block,msg.end_block,msg.start_block-1,time_point::now()};
c->out_sync_state.insert (req);
c->write_block_backlog ();
}

void handle_message (connection_ptr c, const block_summary_message &msg) {
// dlog ("got a block summary message");
dlog ("got a block summary message");
#warning TODO: reconstruct actual block from cached transactions
chain_controller &cc = chain_plug->chain();
if (cc.is_known_block(msg.block.id())) {
Expand All @@ -537,13 +501,13 @@ namespace eos {
}

void handle_message (connection_ptr c, const SignedTransaction &msg) {
ilog ("got a SignedTransacton");
dlog ("got a SignedTransacton");
chain_plug->accept_transaction (msg);
}

void handle_message (connection_ptr c, const signed_block &msg) {
uint32_t bn = msg.block_num();
ilog ("got a signed_block, num = ${n}", ("n", bn));
dlog ("got a signed_block, num = ${n}", ("n", bn));
chain_controller &cc = chain_plug->chain();

if (cc.is_known_block(msg.id())) {
Expand Down Expand Up @@ -594,11 +558,11 @@ namespace eos {
boost::asio::buffer(c->pending_message_buffer.data(),
c->pending_message_size ),
[this,c]( boost::system::error_code ec, std::size_t bytes_transferred ) {
ilog( "read buffer handler..." );
// ilog( "read buffer handler..." );
if( !ec ) {
try {
auto msg = fc::raw::unpack<net_message>( c->pending_message_buffer );
ilog( "received message of size: ${s}", ("s",bytes_transferred) );
// ilog( "received message of size: ${s}", ("s",bytes_transferred) );
start_read_message( c );

msgHandler m(*this, c);
Expand Down Expand Up @@ -628,8 +592,49 @@ namespace eos {

}; // class net_plugin_impl

net_plugin_impl* handshake_initializer::info;

void
handshake_initializer::populate (handshake_message &hello) {
hello.network_version = 0;
hello.chain_id = info->chain_id;
hello.node_id = info->node_id;
#if defined( __APPLE__ )
hello.os = "osx";
#elif defined( __linux__ )
hello.os = "linux";
#elif defined( _MSC_VER )
hello.os = "win32";
#else
hello.os = "other";
#endif
hello.agent = info->user_agent_name;


chain_controller& cc = info->chain_plug->chain();
try {
hello.last_irreversible_block_id = cc.get_block_id_for_num
(hello.last_irreversible_block_num = cc.last_irreversible_block_num());
}
catch (const unknown_block_exception &ex) {
hello.last_irreversible_block_id = fc::sha256::hash(0);
hello.last_irreversible_block_num = 0;
}
try {
hello.head_id = cc.get_block_id_for_num
(hello.head_num = cc.head_block_num());
}
catch (const unknown_block_exception &ex) {
hello.head_id = fc::sha256::hash(0);
hello.head_num = 0;
}

}


net_plugin::net_plugin()
:my( new net_plugin_impl ) {
handshake_initializer::info = my.get();
}

net_plugin::~net_plugin() {
Expand Down Expand Up @@ -667,6 +672,9 @@ namespace eos {
my->user_agent_name = options.at ("agent-name").as< string > ();
}
my->chain_plug = app().find_plugin<chain_plugin>();
my->chain_plug->get_chain_id(my->chain_id);
fc::rand_pseudo_bytes(my->node_id.data(), my->node_id.data_size());

}

void net_plugin::plugin_startup() {
Expand Down
Loading

0 comments on commit d48b34c

Please sign in to comment.