Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[4.0 -> main] P2P: Use magnitude safe time types #1321

Merged
merged 6 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions plugins/net_plugin/include/eosio/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ namespace eosio {
using namespace chain;
using namespace fc;

static_assert(sizeof(std::chrono::system_clock::duration::rep) >= 8, "system_clock is expected to be at least 64 bits");
typedef std::chrono::system_clock::duration::rep tstamp;

struct chain_size_message {
uint32_t last_irreversible_block_num = 0;
block_id_type last_irreversible_block_id;
Expand Down Expand Up @@ -83,10 +80,10 @@ namespace eosio {
};

struct time_message {
tstamp org{0}; //!< origin timestamp
tstamp rec{0}; //!< receive timestamp
tstamp xmt{0}; //!< transmit timestamp
mutable tstamp dst{0}; //!< destination timestamp
int64_t org{0}; //!< origin timestamp, in nanoseconds
int64_t rec{0}; //!< receive timestamp, in nanoseconds
int64_t xmt{0}; //!< transmit timestamp, in nanoseconds
mutable int64_t dst{0}; //!< destination timestamp, in nanoseconds
};

enum id_list_modes {
Expand Down
81 changes: 41 additions & 40 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,8 +772,7 @@ namespace eosio {
bool is_transactions_connection() const { return connection_type != blocks_only; } // thread safe, atomic
bool is_blocks_connection() const { return connection_type != transactions_only; } // thread safe, atomic
void set_heartbeat_timeout(std::chrono::milliseconds msec) {
std::chrono::system_clock::duration dur = msec;
hb_timeout = dur.count();
hb_timeout = msec;
}

uint64_t get_peer_ping_time_ns() const { return peer_ping_time_ns; }
Expand Down Expand Up @@ -864,15 +863,15 @@ namespace eosio {
* @{
*/
// See NTP protocol. https://datatracker.ietf.org/doc/rfc5905/
tstamp org{0}; //!< origin timestamp. Time at the client when the request departed for the server.
// tstamp (not used) rec{0}; //!< receive timestamp. Time at the server when the request arrived from the client.
tstamp xmt{0}; //!< transmit timestamp, Time at the server when the response left for the client.
// tstamp (not used) dst{0}; //!< destination timestamp, Time at the client when the reply arrived from the server.
std::chrono::nanoseconds org{0}; //!< origin timestamp. Time at the client when the request departed for the server.
// std::chrono::nanoseconds (not used) rec{0}; //!< receive timestamp. Time at the server when the request arrived from the client.
std::chrono::nanoseconds xmt{0}; //!< transmit timestamp, Time at the server when the response left for the client.
// std::chrono::nanoseconds (not used) dst{0}; //!< destination timestamp, Time at the client when the reply arrived from the server.
/** @} */
// timestamp for the lastest message
tstamp latest_msg_time{0};
tstamp hb_timeout{std::chrono::milliseconds{def_keepalive_interval}.count()};
tstamp latest_blk_time{0};
std::chrono::system_clock::time_point latest_msg_time{std::chrono::system_clock::time_point::min()};
std::chrono::milliseconds hb_timeout{std::chrono::milliseconds{def_keepalive_interval}};
std::chrono::system_clock::time_point latest_blk_time{std::chrono::system_clock::time_point::min()};
Comment on lines +866 to +874
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer have these all be tstamp as before, but with tstamp defined as std::chrono::nanoseconds.

If we want to differentiate between a point in time (since the epoch), i.e time_point, and a duration, then org/rec/xmt/dst should be time_point, and not durations.

Also I don't think it is a good idea to use std::chrono::system_clock::time_point ourselves because the duration of the system_clock is not specified.

So I think the most straightforward thing might be to use tstamp defined as std::chrono::nanoseconds for both actual timestamps (i.e. time since epoch) and durations. Because it is a duration from std::chrono, we benefit from type safety insuring that we don't add std::chrono::nanoseconds and std::chrono::milliseconds, but we use a consistent internal representation. I think it would make the code simpler and clearer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #1326 as this is a merge up PR, I think it makes sense to make these changes in a separate PR.


bool connected() const;
bool closed() const; // socket is not open or is closed or closing, thread safe
Expand Down Expand Up @@ -913,7 +912,7 @@ namespace eosio {
*/
/** \brief Check heartbeat time and send Time_message
*/
void check_heartbeat( tstamp current_time );
void check_heartbeat( std::chrono::system_clock::time_point current_time );
/** \brief Populate and queue time_message
*/
void send_time();
Expand All @@ -927,8 +926,8 @@ namespace eosio {
* packet is placed on the send queue. Calls the kernel time of
* day routine and converts to a (at least) 64 bit integer.
*/
static tstamp get_time() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
static std::chrono::nanoseconds get_time() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch());
}
/** @} */

Expand Down Expand Up @@ -1317,7 +1316,9 @@ namespace eosio {
peer_ilog( this, "closing" );
cancel_wait();
sync_last_requested_block = 0;
org = 0;
org = std::chrono::nanoseconds{0};
latest_msg_time = std::chrono::system_clock::time_point::min();
latest_blk_time = std::chrono::system_clock::time_point::min();
set_state(connection_state::closed);

if( reconnect && !shutdown ) {
Expand Down Expand Up @@ -1435,8 +1436,8 @@ namespace eosio {
}

// called from connection strand
void connection::check_heartbeat( tstamp current_time ) {
if( latest_msg_time > 0 ) {
void connection::check_heartbeat( std::chrono::system_clock::time_point current_time ) {
if( latest_msg_time > std::chrono::system_clock::time_point::min() ) {
if( current_time > latest_msg_time + hb_timeout ) {
no_retry = benign_other;
if( !peer_address().empty() ) {
Expand All @@ -1449,7 +1450,7 @@ namespace eosio {
return;
}
if (!my_impl->sync_master->syncing_from_peer()) {
const tstamp timeout = std::max(hb_timeout / 2, 2 * std::chrono::milliseconds(config::block_interval_ms).count());
const std::chrono::milliseconds timeout = std::max(hb_timeout/2, 2*std::chrono::milliseconds(config::block_interval_ms));
if (current_time > latest_blk_time + timeout) {
peer_wlog(this, "half heartbeat timed out, sending handshake");
send_handshake();
Expand All @@ -1459,19 +1460,19 @@ namespace eosio {

}

org = 0;
org = std::chrono::nanoseconds{0};
send_time();
}

// called from connection strand
void connection::send_time() {
if (org == 0) { // do not send if there is already a time loop in progress
if (org == std::chrono::nanoseconds{0}) { // do not send if there is already a time loop in progress
org = get_time();
// xpkt.org == 0 means we are initiating a ping. Actual origin time is in xpkt.xmt.
time_message xpkt{
.org = 0,
.rec = 0,
.xmt = org,
.xmt = org.count(),
.dst = 0 };
peer_dlog(this, "send init time_message: ${t}", ("t", xpkt));
enqueue(xpkt);
Expand All @@ -1483,9 +1484,9 @@ namespace eosio {
time_message xpkt{
.org = msg.xmt,
.rec = msg.dst,
.xmt = get_time(),
.xmt = get_time().count(),
.dst = 0 };
peer_dlog( this, "send time_message: ${t}, org: ${o}", ("t", xpkt)("o", org) );
peer_dlog( this, "send time_message: ${t}, org: ${o}", ("t", xpkt)("o", org.count()) );
enqueue(xpkt);
}

Expand Down Expand Up @@ -1716,7 +1717,7 @@ namespace eosio {

block_buffer_factory buff_factory;
auto sb = buff_factory.get_send_buffer( b );
latest_blk_time = get_time();
latest_blk_time = std::chrono::system_clock::now();
enqueue_buffer( sb, no_reason, to_sync_queue);
}

Expand Down Expand Up @@ -2262,7 +2263,7 @@ namespace eosio {
c->close( false, true );
return;
}
c->latest_blk_time = c->get_time();
c->latest_blk_time = std::chrono::system_clock::now();
c->block_status_monitor_.accepted();
stages state = sync_state;
peer_dlog( c, "state ${s}", ("s", stage_str( state )) );
Expand Down Expand Up @@ -2438,7 +2439,7 @@ namespace eosio {
send_buffer_type sb = buff_factory.get_send_buffer( b );

cp->strand.post( [cp, bnum, sb{std::move(sb)}]() {
cp->latest_blk_time = cp->get_time();
cp->latest_blk_time = std::chrono::system_clock::now();
bool has_block = cp->peer_lib_num >= bnum;
if( !has_block ) {
peer_dlog( cp, "bcast block ${b}", ("b", bnum) );
Expand Down Expand Up @@ -2840,14 +2841,14 @@ namespace eosio {
// called from connection strand
bool connection::process_next_message( uint32_t message_length ) {
try {
latest_msg_time = get_time();
latest_msg_time = std::chrono::system_clock::now();

// if next message is a block we already have, exit early
auto peek_ds = pending_message_buffer.create_peek_datastream();
unsigned_int which{};
fc::raw::unpack( peek_ds, which );
if( which == signed_block_which ) {
latest_blk_time = get_time();
latest_blk_time = std::chrono::system_clock::now();
return process_next_block_message( message_length );

} else if( which == packed_transaction_which ) {
Expand Down Expand Up @@ -3280,33 +3281,33 @@ namespace eosio {
}

// some clients before leap 5.0 provided microsecond epoch instead of nanosecond epoch
tstamp normalize_epoch_to_ns(tstamp x) {
std::chrono::nanoseconds normalize_epoch_to_ns(int64_t x) {
// 1686211688888 milliseconds - 2023-06-08T08:08:08.888, 5yrs from EOS genesis 2018-06-08T08:08:08.888
// 1686211688888000 microseconds
// 1686211688888000000 nanoseconds
if (x >= 1686211688888000000) // nanoseconds
return x;
return std::chrono::nanoseconds{x};
if (x >= 1686211688888000) // microseconds
return x*1000;
return std::chrono::nanoseconds{x*1000};
if (x >= 1686211688888) // milliseconds
return x*1000*1000;
return std::chrono::nanoseconds{x*1000*1000};
if (x >= 1686211688) // seconds
return x*1000*1000*1000;
return 0; // unknown or is zero
return std::chrono::nanoseconds{x*1000*1000*1000};
return std::chrono::nanoseconds{0}; // unknown or is zero
}

void connection::handle_message( const time_message& msg ) {
peer_dlog( this, "received time_message: ${t}, org: ${o}", ("t", msg)("o", org) );
peer_dlog( this, "received time_message: ${t}, org: ${o}", ("t", msg)("o", org.count()) );

// If the transmit timestamp is zero, the peer is horribly broken.
if(msg.xmt == 0)
return; // invalid timestamp

// We've already lost however many microseconds it took to dispatch the message, but it can't be helped.
msg.dst = get_time();
msg.dst = get_time().count();

if (msg.org != 0) {
if (msg.org == org) {
if (msg.org == org.count()) {
auto ping = msg.dst - msg.org;
peer_dlog(this, "send_time ping ${p}us", ("p", ping / 1000));
peer_ping_time_ns = ping;
Expand All @@ -3327,16 +3328,16 @@ namespace eosio {
return; // We don't have enough data to perform the calculation yet.
}

if (org != 0) {
if (org != std::chrono::nanoseconds{0}) {
auto rec = normalize_epoch_to_ns(msg.rec);
int64_t offset = (double(rec - org) + double(msg_xmt - msg.dst)) / 2.0;
int64_t offset = (double((rec - org).count()) + double(msg_xmt.count() - msg.dst)) / 2.0;

if (std::abs(offset) > block_interval_ns) {
peer_wlog(this, "Clock offset is ${of}us, calculation: (rec ${r} - org ${o} + xmt ${x} - dst ${d})/2",
("of", offset / 1000)("r", rec)("o", org)("x", msg_xmt)("d", msg.dst));
("of", offset / 1000)("r", rec.count())("o", org.count())("x", msg_xmt.count())("d", msg.dst));
}
}
org = 0;
org = std::chrono::nanoseconds{0};

std::unique_lock<std::mutex> g_conn( conn_mtx );
if( last_handshake_recv.generation == 0 ) {
Expand Down Expand Up @@ -3680,7 +3681,7 @@ namespace eosio {
fc_wlog( logger, "Peer keepalive ticked sooner than expected: ${m}", ("m", ec.message()) );
}

tstamp current_time = connection::get_time();
auto current_time = std::chrono::system_clock::now();
my->connections.for_each_connection( [current_time]( auto& c ) {
if( c->socket_is_open() ) {
c->strand.post([c, current_time]() {
Expand Down