Skip to content

Commit

Permalink
feat(p2p): ensure time synchronization in the network (#2255)
Browse files Browse the repository at this point in the history
* add time validation core logic

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* nit fixes

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* handle time gap

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* improve logging

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* add more trackable processing logs

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* improve info log and remove debugging leftover

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* rename `NetworkInfoRequest` to `PeerInfoRequest`

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* handle recently dialed peers

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* add useful logs

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* create function for pre-dial check

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* set max cap for timestamp channel

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* remove leftover

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* use `Multiaddr` as key

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* fix p2p tests

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* update logs

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* rename leftovers

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* update timing values

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* minor fixes

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* update pre dial check calls

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* apply nit fixes

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* don't update existing expiries

Signed-off-by: onur-ozkan <work@onurozkan.dev>

* revert breakage

Signed-off-by: onur-ozkan <work@onurozkan.dev>

---------

Signed-off-by: onur-ozkan <work@onurozkan.dev>
  • Loading branch information
onur-ozkan authored Dec 23, 2024
1 parent fe5a274 commit 87be260
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 42 deletions.
14 changes: 12 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mm2src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ parking_lot_core = { version = "0.6", features = ["nightly"] }
paste = "1.0"
primitive-types = "0.11.1"
rand = { version = "0.7", features = ["std", "small_rng"] }
rustc-hash = "1.1.0"
rustc-hash = "2.0"
regex = "1"
serde = "1"
serde_derive = "1"
Expand Down
4 changes: 3 additions & 1 deletion mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,11 @@ fn process_p2p_request(
response_channel: mm2_libp2p::AdexResponseChannel,
) -> P2PRequestResult<()> {
let request = decode_message::<P2PRequest>(&request)?;
log::debug!("Got P2PRequest {:?}", request);

let result = match request {
P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req),
P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req),
P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req).map(Some),
};

let res = match result {
Expand Down
1 change: 0 additions & 1 deletion mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,6 @@ impl TryFromBytes for Uuid {
}

pub fn process_peer_request(ctx: MmArc, request: OrdermatchRequest) -> Result<Option<Vec<u8>>, String> {
log::debug!("Got ordermatch request {:?}", request);
match request {
OrdermatchRequest::GetOrderbook { base, rel } => process_get_orderbook_request(ctx, base, rel),
OrdermatchRequest::SyncPubkeyOrderbookState { pubkey, trie_roots } => {
Expand Down
19 changes: 14 additions & 5 deletions mm2src/mm2_main/src/lp_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use mm2_libp2p::application::request_response::network_info::NetworkInfoRequest;
use mm2_libp2p::{encode_message, NetworkInfo, PeerId, RelayAddress, RelayAddressError};
use serde_json::{self as json, Value as Json};
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::sync::Arc;

use crate::lp_network::{add_reserved_peer_addresses, lp_network_ports, request_peers, NetIdError, ParseAddressError,
Expand Down Expand Up @@ -170,16 +171,24 @@ struct Mm2VersionRes {
nodes: HashMap<String, String>,
}

fn process_get_version_request(ctx: MmArc) -> Result<Option<Vec<u8>>, String> {
fn process_get_version_request(ctx: MmArc) -> Result<Vec<u8>, String> {
let response = ctx.mm_version().to_string();
let encoded = try_s!(encode_message(&response));
Ok(Some(encoded))
encode_message(&response).map_err(|e| e.to_string())
}

pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result<Option<Vec<u8>>, String> {
log::debug!("Got stats request {:?}", request);
fn process_get_peer_utc_timestamp_request() -> Result<Vec<u8>, String> {
let timestamp = common::get_utc_timestamp();
let timestamp: u64 = timestamp
.try_into()
.unwrap_or_else(|_| panic!("`common::get_utc_timestamp` returned invalid data: {}", timestamp));

encode_message(&timestamp).map_err(|e| e.to_string())
}

pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result<Vec<u8>, String> {
match request {
NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx),
NetworkInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(),
}
}

Expand Down
4 changes: 3 additions & 1 deletion mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use derive_more::Display;
use http::Response;
use mm2_core::mm_ctx::{from_ctx, MmArc};
use mm2_err_handle::prelude::*;
use mm2_libp2p::behaviours::atomicdex::MAX_TIME_GAP_FOR_CONNECTED_PEER;
use mm2_libp2p::{decode_signed, encode_and_sign, pub_sub_topic, PeerId, TopicPrefix};
use mm2_number::{BigDecimal, BigRational, MmNumber, MmNumberMultiRepr};
use mm2_state_machine::storable_state_machine::StateMachineStorage;
Expand Down Expand Up @@ -155,12 +156,13 @@ pub(crate) const TAKER_SWAP_V2_TYPE: u8 = 2;
pub(crate) const TAKER_FEE_VALIDATION_ATTEMPTS: usize = 6;
pub(crate) const TAKER_FEE_VALIDATION_RETRY_DELAY_SECS: f64 = 10.;

const MAX_STARTED_AT_DIFF: u64 = 60;
const NEGOTIATE_SEND_INTERVAL: f64 = 30.;

/// If a certain P2P message is not received, swap will be aborted after this time expires.
const NEGOTIATION_TIMEOUT_SEC: u64 = 90;

const MAX_STARTED_AT_DIFF: u64 = MAX_TIME_GAP_FOR_CONNECTED_PEER * 3;

cfg_wasm32! {
use mm2_db::indexed_db::{ConstructibleDb, DbLocked};
use saved_swap::migrate_swaps_data;
Expand Down
2 changes: 2 additions & 0 deletions mm2src/mm2_p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ void = "1.0"
futures-rustls = "0.24"
instant = "0.1.12"
libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.11", default-features = false, features = ["dns", "identify", "floodsub", "gossipsub", "noise", "ping", "request-response", "secp256k1", "tcp", "tokio", "websocket", "macros", "yamux"] }
timed-map = { version = "1.1.1", features = ["rustc-hash"] }
tokio = { version = "1.20", default-features = false }

[target.'cfg(target_arch = "wasm32")'.dependencies]
futures-rustls = "0.22"
instant = { version = "0.1.12", features = ["wasm-bindgen"] }
libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.11", default-features = false, features = ["identify", "floodsub", "noise", "gossipsub", "ping", "request-response", "secp256k1", "wasm-ext", "wasm-ext-websocket", "macros", "yamux"] }
timed-map = { version = "1.1.1", features = ["rustc-hash"] }

[dev-dependencies]
async-std = "1.6.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ use serde::{Deserialize, Serialize};
pub enum NetworkInfoRequest {
/// Get MM2 version of nodes added to stats collection
GetMm2Version,
/// Get UTC timestamp in seconds from the target peer
GetPeerUtcTimestamp,
}
127 changes: 120 additions & 7 deletions mm2src/mm2_p2p/src/behaviours/atomicdex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use futures::{channel::oneshot,
use futures_rustls::rustls;
use futures_ticker::Ticker;
use instant::Duration;
use lazy_static::lazy_static;
use libp2p::core::transport::Boxed as BoxedTransport;
use libp2p::core::{ConnectedPoint, Endpoint};
use libp2p::floodsub::{Floodsub, FloodsubEvent, Topic as FloodsubTopic};
Expand All @@ -23,16 +24,20 @@ use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::iter;
use std::net::IpAddr;
use std::sync::{Mutex, MutexGuard};
use std::task::{Context, Poll};
use timed_map::{MapKind, StdClock, TimedMap};

use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest, PeersExchangeResponse};
use super::ping::AdexPing;
use super::request_response::{build_request_response_behaviour, PeerRequest, PeerResponse, RequestResponseBehaviour,
RequestResponseSender};
use crate::application::request_response::network_info::NetworkInfoRequest;
use crate::application::request_response::P2PRequest;
use crate::network::{get_all_network_seednodes, DEFAULT_NETID};
use crate::relay_address::{RelayAddress, RelayAddressError};
use crate::swarm_runtime::SwarmRuntime;
use crate::{NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent};
use crate::{decode_message, encode_message, NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent};

pub use libp2p::gossipsub::{Behaviour as Gossipsub, IdentTopic, MessageAuthenticity, MessageId, Topic, TopicHash};
pub use libp2p::gossipsub::{ConfigBuilder as GossipsubConfigBuilder, Event as GossipsubEvent,
Expand All @@ -50,6 +55,21 @@ const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(600);
const ANNOUNCE_INITIAL_DELAY: Duration = Duration::from_secs(60);
const CHANNEL_BUF_SIZE: usize = 1024 * 8;

/// Used in time validation logic for each peer which runs immediately after the
/// `ConnectionEstablished` event.
///
/// Be careful when updating this value, we have some defaults (like for swaps)
/// depending on this.
pub const MAX_TIME_GAP_FOR_CONNECTED_PEER: u64 = 20;

/// Used for storing peers in [`RECENTLY_DIALED_PEERS`].
const DIAL_RETRY_DELAY: Duration = Duration::from_secs(60 * 5);

lazy_static! {
/// Tracks recently dialed peers to avoid repeated connection attempts.
static ref RECENTLY_DIALED_PEERS: Mutex<TimedMap<StdClock, Multiaddr, ()>> = Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap));
}

pub const DEPRECATED_NETID_LIST: &[u16] = &[
7777, // TODO: keep it inaccessible until Q2 of 2024.
];
Expand Down Expand Up @@ -162,6 +182,24 @@ pub enum AdexBehaviourCmd {
},
}

/// Determines if a dial attempt to the remote should be made.
///
/// Returns `false` if a dial attempt to the given address has already been made,
/// in which case the caller must skip the dial attempt.
fn check_and_mark_dialed(
recently_dialed_peers: &mut MutexGuard<TimedMap<StdClock, Multiaddr, ()>>,
addr: &Multiaddr,
) -> bool {
if recently_dialed_peers.get(addr).is_some() {
info!("Connection attempt was already made recently to '{addr}'.");
return false;
}

recently_dialed_peers.insert_expirable_unchecked(addr.clone(), (), DIAL_RETRY_DELAY);

true
}

/// Returns info about directly connected peers.
pub async fn get_directly_connected_peers(mut cmd_tx: AdexCmdTx) -> HashMap<String, Vec<String>> {
let (result_tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -199,6 +237,46 @@ pub async fn get_relay_mesh(mut cmd_tx: AdexCmdTx) -> Vec<String> {
rx.await.expect("Tx should be present")
}

async fn validate_peer_time(peer: PeerId, mut response_tx: Sender<Option<PeerId>>, rp_sender: RequestResponseSender) {
let request = P2PRequest::NetworkInfo(NetworkInfoRequest::GetPeerUtcTimestamp);
let encoded_request = encode_message(&request)
.expect("Static type `PeerInfoRequest::GetPeerUtcTimestamp` should never fail in serialization.");

match request_one_peer(peer, encoded_request, rp_sender).await {
PeerResponse::Ok { res } => {
if let Ok(timestamp) = decode_message::<u64>(&res) {
let now = common::get_utc_timestamp();
let now: u64 = now
.try_into()
.unwrap_or_else(|_| panic!("`common::get_utc_timestamp` returned invalid data: {}", now));

let diff = now.abs_diff(timestamp);

// If time diff is in the acceptable gap, end the validation here.
if diff <= MAX_TIME_GAP_FOR_CONNECTED_PEER {
debug!(
"Peer '{peer}' is within the acceptable time gap ({MAX_TIME_GAP_FOR_CONNECTED_PEER} seconds); time difference is {diff} seconds."
);
response_tx.send(None).await.unwrap();
return;
}
};
},
other => {
error!("Unexpected response `{other:?}` from peer `{peer}`");
// TODO: Ideally, we should send `Some(peer)` to end the connection,
// but we don't want to cause a breaking change yet.
response_tx.send(None).await.unwrap();
return;
},
}

// If the function reaches this point, this means validation has failed.
// Send the peer ID to disconnect from it.
error!("Failed to validate the time for peer `{peer}`; disconnecting.");
response_tx.send(Some(peer)).await.unwrap();
}

async fn request_one_peer(peer: PeerId, req: Vec<u8>, mut request_response_tx: RequestResponseSender) -> PeerResponse {
// Use the internal receiver to receive a response to this request.
let (internal_response_tx, internal_response_rx) = oneshot::channel();
Expand Down Expand Up @@ -711,19 +789,26 @@ fn start_gossipsub(
_ => (),
}

let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap();
for relay in bootstrap.choose_multiple(&mut rng, mesh_n) {
if !check_and_mark_dialed(&mut recently_dialed_peers, relay) {
continue;
}

match libp2p::Swarm::dial(&mut swarm, relay.clone()) {
Ok(_) => info!("Dialed {}", relay),
Err(e) => error!("Dial {:?} failed: {:?}", relay, e),
}
}
drop(recently_dialed_peers);

let mut check_connected_relays_interval =
Ticker::new_with_next(CONNECTED_RELAYS_CHECK_INTERVAL, CONNECTED_RELAYS_CHECK_INTERVAL);

let mut announce_interval = Ticker::new_with_next(ANNOUNCE_INTERVAL, ANNOUNCE_INITIAL_DELAY);
let mut listening = false;

let (timestamp_tx, mut timestamp_rx) = futures::channel::mpsc::channel(mesh_n_high);
let polling_fut = poll_fn(move |cx: &mut Context| {
loop {
match swarm.behaviour_mut().cmd_rx.poll_next_unpin(cx) {
Expand All @@ -733,11 +818,27 @@ fn start_gossipsub(
}
}

while let Poll::Ready(Some(Some(peer_id))) = timestamp_rx.poll_next_unpin(cx) {
if swarm.disconnect_peer_id(peer_id).is_err() {
error!("Disconnection from `{peer_id}` failed unexpectedly, which should never happen.");
}
}

loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(event)) => {
debug!("Swarm event {:?}", event);

if let SwarmEvent::ConnectionEstablished { peer_id, .. } = &event {
info!("Validating time data for peer `{peer_id}`.");
let future = validate_peer_time(
*peer_id,
timestamp_tx.clone(),
swarm.behaviour().core.request_response.sender(),
);
swarm.behaviour().spawn(future);
}

if let SwarmEvent::Behaviour(event) = event {
if swarm.behaviour_mut().netid != DEFAULT_NETID {
if let AdexBehaviourEvent::Floodsub(FloodsubEvent::Message(message)) = &event {
Expand Down Expand Up @@ -798,19 +899,29 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses

let mut rng = rand::thread_rng();
if connected_relays.len() < mesh_n_low {
let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap();
let to_connect_num = mesh_n - connected_relays.len();
let to_connect = swarm
.behaviour_mut()
.core
.peers_exchange
.get_random_peers(to_connect_num, |peer| !connected_relays.contains(peer));
let to_connect =
swarm
.behaviour_mut()
.core
.peers_exchange
.get_random_peers(to_connect_num, |peer, addresses| {
!connected_relays.contains(peer)
&& addresses
.iter()
.any(|addr| check_and_mark_dialed(&mut recently_dialed_peers, addr))
});

// choose some random bootstrap addresses to connect if peers exchange returned not enough peers
if to_connect.len() < to_connect_num {
let connect_bootstrap_num = to_connect_num - to_connect.len();
for addr in bootstrap_addresses
.iter()
.filter(|addr| !swarm.behaviour().core.gossipsub.is_connected_to_addr(addr))
.filter(|addr| {
!swarm.behaviour().core.gossipsub.is_connected_to_addr(addr)
&& check_and_mark_dialed(&mut recently_dialed_peers, addr)
})
.collect::<Vec<_>>()
.choose_multiple(&mut rng, connect_bootstrap_num)
{
Expand All @@ -824,11 +935,13 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses
if swarm.behaviour().core.gossipsub.is_connected_to_addr(&addr) {
continue;
}

if let Err(e) = libp2p::Swarm::dial(swarm, addr.clone()) {
error!("Peer {} address {} dial error {}", peer, addr, e);
}
}
}
drop(recently_dialed_peers);
}

if connected_relays.len() > max_n {
Expand Down
Loading

0 comments on commit 87be260

Please sign in to comment.