Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): ensure time synchronization in the network #2255

Open
wants to merge 21 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mm2src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ parking_lot = { version = "0.12.0", features = ["nightly"] }
parking_lot_core = { version = "0.6", features = ["nightly"] }
primitive-types = "0.11.1"
rand = { version = "0.7", features = ["std", "small_rng"] }
rustc-hash = "1.1.0"
rustc-hash = "2.0"
regex = "1"
serde = "1"
serde_derive = "1"
Expand Down
4 changes: 3 additions & 1 deletion mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,11 @@ fn process_p2p_request(
response_channel: mm2_libp2p::AdexResponseChannel,
) -> P2PRequestResult<()> {
let request = decode_message::<P2PRequest>(&request)?;
log::debug!("Got P2PRequest {:?}", request);

let result = match request {
P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req),
P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req),
P2PRequest::PeerInfo(req) => lp_stats::process_info_request(ctx.clone(), req).map(Some),
};

let res = match result {
Expand Down
1 change: 0 additions & 1 deletion mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,6 @@ impl TryFromBytes for Uuid {
}

pub fn process_peer_request(ctx: MmArc, request: OrdermatchRequest) -> Result<Option<Vec<u8>>, String> {
log::debug!("Got ordermatch request {:?}", request);
match request {
OrdermatchRequest::GetOrderbook { base, rel } => process_get_orderbook_request(ctx, base, rel),
OrdermatchRequest::SyncPubkeyOrderbookState { pubkey, trie_roots } => {
Expand Down
43 changes: 22 additions & 21 deletions mm2src/mm2_main/src/lp_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::lock::Mutex as AsyncMutex;
use http::StatusCode;
use mm2_core::mm_ctx::{from_ctx, MmArc};
use mm2_err_handle::prelude::*;
use mm2_libp2p::application::request_response::network_info::NetworkInfoRequest;
use mm2_libp2p::application::request_response::peer_info::PeerInfoRequest;
use mm2_libp2p::{encode_message, NetworkInfo, PeerId, RelayAddress, RelayAddressError};
use serde_json::{self as json, Value as Json};
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -170,16 +170,20 @@ struct Mm2VersionRes {
nodes: HashMap<String, String>,
}

fn process_get_version_request(ctx: MmArc) -> Result<Option<Vec<u8>>, String> {
fn process_get_version_request(ctx: MmArc) -> Result<Vec<u8>, String> {
let response = ctx.mm_version().to_string();
let encoded = try_s!(encode_message(&response));
Ok(Some(encoded))
encode_message(&response).map_err(|e| e.to_string())
}

pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result<Option<Vec<u8>>, String> {
log::debug!("Got stats request {:?}", request);
fn process_get_peer_utc_timestamp_request() -> Result<Vec<u8>, String> {
let timestamp = common::get_utc_timestamp();
encode_message(&timestamp).map_err(|e| e.to_string())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we don't wanna be explicit with casting, let's at least leave a comment that i64 will encode just as u64.
it's so confusing seeing one type sent from this end and another one received.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that explanation alone makes it more complicated than it should be. I didn't think it would be this confusing 🤔. I would rather cast it to u64 before sending it instead of explaining it like "we do this because of encoding numbers..."

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

me too tbh, i favor explicit casting all the way :/

}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...encodes it as a i64. we should stick to only one. u64 makes the most sense application-wise as we don't really need the negative side? right?

but curious, is the encoding of i64 and u64 compatible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should stick to only one. u64 makes the most sense application-wise as we don't really need the negative side? right?

Why do you want to cast the value again when you can simply encode it?

but curious, is the encoding of i64 and u64 compatible?

They are

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you want to cast the value again when you can simply encode it?

didn't know the encoding for both is compatible, still not sure how the negative side is encoded (probably as rust's then, will checkout).
so thing is, you actually do some kind of an implicit cast here using data encoding and decoding, you encode the number as i64, send it, decode it as u64 from the receiver. i assume a negative i64 will decode fine as a u64 just like what the as keyword does but give a garbage data (will check more on that encoding lib).

what i am suggesting is, we can encode it as i64 and decode it as i64 as well, we check abs_diff without the need for any fallible conversions what so ever. (the suggestion stems from the fact that i64 encodes just like u64 over the network, i.e. if we ever change this to u64 encoding we will still be backward compatible. will have to double check this fact still :) ).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still not sure how the negative side is encoded

Theoretically, it should be fine on encoding but should panic on decoding, which is the main purpose on the decoding side (as negative ones are invalid).


pub fn process_info_request(ctx: MmArc, request: PeerInfoRequest) -> Result<Vec<u8>, String> {
match request {
NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx),
PeerInfoRequest::GetMm2Version => process_get_version_request(ctx),
PeerInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(),
}
}

Expand Down Expand Up @@ -298,20 +302,17 @@ async fn stat_collection_loop(ctx: MmArc, interval: f64) {
let peers: Vec<String> = peers_names.keys().cloned().collect();

let timestamp = now_sec();
let get_versions_res = match request_peers::<String>(
ctx.clone(),
P2PRequest::NetworkInfo(NetworkInfoRequest::GetMm2Version),
peers,
)
.await
{
Ok(res) => res,
Err(e) => {
log::error!("Error getting nodes versions from peers: {}", e);
Timer::sleep(10.).await;
continue;
},
};
let get_versions_res =
match request_peers::<String>(ctx.clone(), P2PRequest::PeerInfo(PeerInfoRequest::GetMm2Version), peers)
.await
{
Ok(res) => res,
Err(e) => {
log::error!("Error getting nodes versions from peers: {}", e);
Timer::sleep(10.).await;
continue;
},
};

for (peer_id, response) in get_versions_res {
let name = match peers_names.get(&peer_id.to_string()) {
Expand Down
3 changes: 2 additions & 1 deletion mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use derive_more::Display;
use http::Response;
use mm2_core::mm_ctx::{from_ctx, MmArc};
use mm2_err_handle::prelude::*;
use mm2_libp2p::behaviours::atomicdex::MAX_TIME_GAP_FOR_CONNECTED_PEER;
use mm2_libp2p::{decode_signed, encode_and_sign, pub_sub_topic, PeerId, TopicPrefix};
use mm2_number::{BigDecimal, BigRational, MmNumber, MmNumberMultiRepr};
use mm2_state_machine::storable_state_machine::StateMachineStorage;
Expand Down Expand Up @@ -151,7 +152,7 @@ pub const TX_HELPER_PREFIX: TopicPrefix = "txhlp";
pub(crate) const LEGACY_SWAP_TYPE: u8 = 0;
pub(crate) const MAKER_SWAP_V2_TYPE: u8 = 1;
pub(crate) const TAKER_SWAP_V2_TYPE: u8 = 2;
const MAX_STARTED_AT_DIFF: u64 = 60;
const MAX_STARTED_AT_DIFF: u64 = MAX_TIME_GAP_FOR_CONNECTED_PEER * 3;

const NEGOTIATE_SEND_INTERVAL: f64 = 30.;

Expand Down
2 changes: 2 additions & 0 deletions mm2src/mm2_p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ void = "1.0"
futures-rustls = "0.24"
instant = "0.1.12"
libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.4", default-features = false, features = ["dns", "identify", "floodsub", "gossipsub", "noise", "ping", "request-response", "secp256k1", "tcp", "tokio", "websocket", "macros", "yamux"] }
timed-map = { version = "1.1.1", features = ["rustc-hash"] }
tokio = { version = "1.20", default-features = false }

[target.'cfg(target_arch = "wasm32")'.dependencies]
futures-rustls = "0.22"
instant = { version = "0.1.12", features = ["wasm-bindgen"] }
libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.4", default-features = false, features = ["identify", "floodsub", "noise", "gossipsub", "ping", "request-response", "secp256k1", "wasm-ext", "wasm-ext-websocket", "macros", "yamux"] }
timed-map = { version = "1.1.1", features = ["rustc-hash", "wasm"] }

[dev-dependencies]
async-std = "1.6.2"
Expand Down
10 changes: 3 additions & 7 deletions mm2src/mm2_p2p/src/application/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//! which are separate from other request types such as RPC requests or Gossipsub
//! messages.

pub mod network_info;
pub mod ordermatch;
pub mod peer_info;

use serde::{Deserialize, Serialize};

Expand All @@ -12,10 +12,6 @@ use serde::{Deserialize, Serialize};
pub enum P2PRequest {
/// Request for order matching.
Ordermatch(ordermatch::OrdermatchRequest),
/// Request for network information from the target peer.
///
/// TODO: This should be called `PeerInfoRequest` instead. However, renaming it
/// will introduce a breaking change in the network and is not worth it. Do this
/// renaming when there is already a breaking change in the release.
NetworkInfo(network_info::NetworkInfoRequest),
Comment on lines -17 to -20
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR already leads to breaking change. It's perfect time to rename this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to avoid breaking changes is to have a timeout on the request for peer timestamp and allow them to connect if there is no response from the peer. We later remove this when most peers update. Another way is to get the mm2 version and do the timestamp request depending on the version. What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to avoid breaking changes is to have a timeout on the request for peer timestamp and allow them to connect if there is no response from the peer. We later remove this when most peers update.

This sounds like a good plan!

Another way is to get the mm2 version and do the timestamp request depending on the version. What do you think?

I think the first idea was better, but then I need to revert the peer-info renaming part. What about bundling these changes in some branch, and then releasing multiple breaking changes at once (like removing mm2 binaries and some other stuff) ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about bundling these changes in some branch, and then releasing multiple breaking changes at once (like removing mm2 binaries and some other stuff) ?

I am fine with this or the first approach.

/// Request various information from the target peer.
PeerInfo(peer_info::PeerInfoRequest),
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use serde::{Deserialize, Serialize};
/// Wraps the different types of network information requests for the P2P request-response
/// protocol.
#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]
pub enum NetworkInfoRequest {
pub enum PeerInfoRequest {
/// Get MM2 version of nodes added to stats collection
GetMm2Version,
/// Get UTC timestamp in seconds from the target peer
GetPeerUtcTimestamp,
}
Loading
Loading