Skip to content

Commit

Permalink
Merge pull request #1316 from AntelopeIO/GH-1315-time-4.0
Browse files Browse the repository at this point in the history
[4.0] P2P: Use magnitude safe time types
  • Loading branch information
heifner authored Jun 20, 2023
2 parents 6aca8a4 + 0b3b3b6 commit e9016b0
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 56 deletions.
11 changes: 4 additions & 7 deletions plugins/net_plugin/include/eosio/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ namespace eosio {
using namespace chain;
using namespace fc;

static_assert(sizeof(std::chrono::system_clock::duration::rep) >= 8, "system_clock is expected to be at least 64 bits");
typedef std::chrono::system_clock::duration::rep tstamp;

struct chain_size_message {
uint32_t last_irreversible_block_num = 0;
block_id_type last_irreversible_block_id;
Expand Down Expand Up @@ -83,10 +80,10 @@ namespace eosio {
};

struct time_message {
tstamp org{0}; //!< origin timestamp
tstamp rec{0}; //!< receive timestamp
tstamp xmt{0}; //!< transmit timestamp
mutable tstamp dst{0}; //!< destination timestamp
int64_t org{0}; //!< origin timestamp, in nanoseconds
int64_t rec{0}; //!< receive timestamp, in nanoseconds
int64_t xmt{0}; //!< transmit timestamp, in nanoseconds
mutable int64_t dst{0}; //!< destination timestamp, in nanoseconds
};

enum id_list_modes {
Expand Down
121 changes: 72 additions & 49 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ namespace eosio {
using connection_ptr = std::shared_ptr<connection>;
using connection_wptr = std::weak_ptr<connection>;

static constexpr int64_t block_interval_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(config::block_interval_ms)).count();

const fc::string logger_name("net_plugin_impl");
fc::logger logger;
std::string peer_log_format;
Expand Down Expand Up @@ -121,9 +124,6 @@ namespace eosio {
in_sync
};

static constexpr int64_t block_interval_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(config::block_interval_ms)).count();

mutable std::mutex sync_mtx;
uint32_t sync_known_lib_num{0};
uint32_t sync_last_requested_num{0};
Expand Down Expand Up @@ -607,8 +607,7 @@ namespace eosio {
bool is_transactions_only_connection()const { return connection_type == transactions_only; }
bool is_blocks_only_connection()const { return connection_type == blocks_only; }
void set_heartbeat_timeout(std::chrono::milliseconds msec) {
std::chrono::system_clock::duration dur = msec;
hb_timeout = dur.count();
hb_timeout = msec;
}

private:
Expand Down Expand Up @@ -682,15 +681,15 @@ namespace eosio {
* @{
*/
// Members set from network data
tstamp org{0}; //!< originate timestamp
tstamp rec{0}; //!< receive timestamp
tstamp dst{0}; //!< destination timestamp
tstamp xmt{0}; //!< transmit timestamp
std::chrono::nanoseconds org{0}; //!< origin timestamp. Time at the client when the request departed for the server.
// std::chrono::nanoseconds (not used) rec{0}; //!< receive timestamp. Time at the server when the request arrived from the client.
std::chrono::nanoseconds xmt{0}; //!< transmit timestamp, Time at the server when the response left for the client.
// std::chrono::nanoseconds (not used) dst{0}; //!< destination timestamp, Time at the client when the reply arrived from the server.
/** @} */
// timestamp for the lastest message
tstamp latest_msg_time{0};
tstamp hb_timeout{std::chrono::milliseconds{def_keepalive_interval}.count()};
tstamp latest_blk_time{0};
std::chrono::system_clock::time_point latest_msg_time{std::chrono::system_clock::time_point::min()};
std::chrono::milliseconds hb_timeout{std::chrono::milliseconds{def_keepalive_interval}};
std::chrono::system_clock::time_point latest_blk_time{std::chrono::system_clock::time_point::min()};

bool connected();
bool current();
Expand Down Expand Up @@ -728,7 +727,7 @@ namespace eosio {
*/
/** \brief Check heartbeat time and send Time_message
*/
void check_heartbeat( tstamp current_time );
void check_heartbeat( std::chrono::system_clock::time_point current_time );
/** \brief Populate and queue time_message
*/
void send_time();
Expand All @@ -742,8 +741,8 @@ namespace eosio {
* packet is placed on the send queue. Calls the kernel time of
* day routine and converts to a (at least) 64 bit integer.
*/
static tstamp get_time() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
static std::chrono::nanoseconds get_time() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch());
}
/** @} */

Expand Down Expand Up @@ -1051,6 +1050,8 @@ namespace eosio {
if( !shutdown) my_impl->sync_master->sync_reset_lib_num( self->shared_from_this(), true );
peer_ilog( self, "closing" );
self->cancel_wait();
self->latest_msg_time = std::chrono::system_clock::time_point::min();
self->latest_blk_time = std::chrono::system_clock::time_point::min();

if( reconnect && !shutdown ) {
my_impl->start_conn_timer( std::chrono::milliseconds( 100 ), connection_wptr() );
Expand Down Expand Up @@ -1165,8 +1166,8 @@ namespace eosio {
}

// called from connection strand
void connection::check_heartbeat( tstamp current_time ) {
if( latest_msg_time > 0 ) {
void connection::check_heartbeat( std::chrono::system_clock::time_point current_time ) {
if( latest_msg_time > std::chrono::system_clock::time_point::min() ) {
if( current_time > latest_msg_time + hb_timeout ) {
no_retry = benign_other;
if( !peer_address().empty() ) {
Expand All @@ -1178,7 +1179,7 @@ namespace eosio {
}
return;
} else {
const tstamp timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms).count());
const std::chrono::milliseconds timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms));
if ( current_time > latest_blk_time + timeout ) {
send_handshake();
return;
Expand All @@ -1191,20 +1192,27 @@ namespace eosio {

// called from connection strand
void connection::send_time() {
time_message xpkt;
xpkt.org = rec;
xpkt.rec = dst;
xpkt.xmt = get_time();
org = xpkt.xmt;
enqueue(xpkt);
if (org == std::chrono::nanoseconds{0}) { // do not send if there is already a time loop in progress
org = get_time();
// xpkt.org == 0 means we are initiating a ping. Actual origin time is in xpkt.xmt.
time_message xpkt{
.org = 0,
.rec = 0,
.xmt = org.count(),
.dst = 0 };
peer_dlog(this, "send init time_message: ${t}", ("t", xpkt));
enqueue(xpkt);
}
}

// called from connection strand
void connection::send_time(const time_message& msg) {
time_message xpkt;
xpkt.org = msg.xmt;
xpkt.rec = msg.dst;
xpkt.xmt = get_time();
time_message xpkt{
.org = msg.xmt,
.rec = msg.dst,
.xmt = get_time().count(),
.dst = 0 };
peer_dlog( this, "send time_message: ${t}, org: ${o}", ("t", xpkt)("o", org.count()) );
enqueue(xpkt);
}

Expand Down Expand Up @@ -1434,7 +1442,7 @@ namespace eosio {

block_buffer_factory buff_factory;
auto sb = buff_factory.get_send_buffer( b );
latest_blk_time = get_time();
latest_blk_time = std::chrono::system_clock::now();
enqueue_buffer( sb, no_reason, to_sync_queue);
}

Expand Down Expand Up @@ -2154,7 +2162,7 @@ namespace eosio {
send_buffer_type sb = buff_factory.get_send_buffer( b );

cp->strand.post( [cp, bnum, sb{std::move(sb)}]() {
cp->latest_blk_time = cp->get_time();
cp->latest_blk_time = std::chrono::system_clock::now();
bool has_block = cp->peer_lib_num >= bnum;
if( !has_block ) {
peer_dlog( cp, "bcast block ${b}", ("b", bnum) );
Expand Down Expand Up @@ -2574,14 +2582,14 @@ namespace eosio {
// called from connection strand
bool connection::process_next_message( uint32_t message_length ) {
try {
latest_msg_time = get_time();
latest_msg_time = std::chrono::system_clock::now();

// if next message is a block we already have, exit early
auto peek_ds = pending_message_buffer.create_peek_datastream();
unsigned_int which{};
fc::raw::unpack( peek_ds, which );
if( which == signed_block_which ) {
latest_blk_time = get_time();
latest_blk_time = std::chrono::system_clock::now();
return process_next_block_message( message_length );

} else if( which == packed_transaction_which ) {
Expand Down Expand Up @@ -3004,38 +3012,53 @@ namespace eosio {
close( retry ); // reconnect if wrong_version
}

// some clients before leap 5.0 provided microsecond epoch instead of nanosecond epoch
std::chrono::nanoseconds normalize_epoch_to_ns(int64_t x) {
// 1686211688888 milliseconds - 2023-06-08T08:08:08.888, 5yrs from EOS genesis 2018-06-08T08:08:08.888
// 1686211688888000 microseconds
// 1686211688888000000 nanoseconds
if (x >= 1686211688888000000) // nanoseconds
return std::chrono::nanoseconds{x};
if (x >= 1686211688888000) // microseconds
return std::chrono::nanoseconds{x*1000};
if (x >= 1686211688888) // milliseconds
return std::chrono::nanoseconds{x*1000*1000};
if (x >= 1686211688) // seconds
return std::chrono::nanoseconds{x*1000*1000*1000};
return std::chrono::nanoseconds{0}; // unknown or is zero
}

void connection::handle_message( const time_message& msg ) {
peer_ilog( this, "received time_message" );
peer_dlog( this, "received time_message: ${t}, org: ${o}", ("t", msg)("o", org.count()) );

/* We've already lost however many microseconds it took to dispatch
* the message, but it can't be helped.
*/
msg.dst = get_time();
// We've already lost however many microseconds it took to dispatch the message, but it can't be helped.
msg.dst = get_time().count();

// If the transmit timestamp is zero, the peer is horribly broken.
if(msg.xmt == 0)
return; /* invalid timestamp */

if(msg.xmt == xmt)
auto msg_xmt = normalize_epoch_to_ns(msg.xmt);
if(msg_xmt == xmt)
return; /* duplicate packet */

xmt = msg.xmt;
rec = msg.rec;
dst = msg.dst;
xmt = msg_xmt;

if( msg.org == 0 ) {
send_time( msg );
return; // We don't have enough data to perform the calculation yet.
}

double offset = (double(rec - org) + double(msg.xmt - dst)) / 2;
double NsecPerUsec{1000};
if (org != std::chrono::nanoseconds{0}) {
auto rec = normalize_epoch_to_ns(msg.rec);
int64_t offset = (double((rec - org).count()) + double(msg_xmt.count() - msg.dst)) / 2.0;

if( logger.is_enabled( fc::log_level::all ) )
logger.log( FC_LOG_MESSAGE( all, "Clock offset is ${o}ns (${us}us)",
("o", offset)( "us", offset / NsecPerUsec ) ) );
org = 0;
rec = 0;
if (std::abs(offset) > block_interval_ns) {
peer_wlog(this, "Clock offset is ${of}us, calculation: (rec ${r} - org ${o} + xmt ${x} - dst ${d})/2",
("of", offset / 1000)("r", rec.count())("o", org.count())("x", msg_xmt.count())("d", msg.dst));
}
}
org = std::chrono::nanoseconds{0};

std::unique_lock<std::mutex> g_conn( conn_mtx );
if( last_handshake_recv.generation == 0 ) {
Expand Down Expand Up @@ -3384,7 +3407,7 @@ namespace eosio {
fc_wlog( logger, "Peer keepalive ticked sooner than expected: ${m}", ("m", ec.message()) );
}

tstamp current_time = connection::get_time();
auto current_time = std::chrono::system_clock::now();
my->for_each_connection( [current_time]( auto& c ) {
if( c->socket_is_open() ) {
c->strand.post([c, current_time]() {
Expand Down

0 comments on commit e9016b0

Please sign in to comment.