From 87be260f29a63155ea9ad0d2b9d08bdd70b019f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Onur=20=C3=96zkan?= Date: Mon, 23 Dec 2024 15:06:02 +0300 Subject: [PATCH] feat(p2p): ensure time synchronization in the network (#2255) * add time validation core logic Signed-off-by: onur-ozkan * nit fixes Signed-off-by: onur-ozkan * handle time gap Signed-off-by: onur-ozkan * improve logging Signed-off-by: onur-ozkan * add more trackable processing logs Signed-off-by: onur-ozkan * improve info log and remove debugging leftover Signed-off-by: onur-ozkan * rename `NetworkInfoRequest` to `PeerInfoRequest` Signed-off-by: onur-ozkan * handle recently dialed peers Signed-off-by: onur-ozkan * add useful logs Signed-off-by: onur-ozkan * create function for pre-dial check Signed-off-by: onur-ozkan * set max cap for timestamp channel Signed-off-by: onur-ozkan * remove leftover Signed-off-by: onur-ozkan * use `Multiaddr` as key Signed-off-by: onur-ozkan * fix p2p tests Signed-off-by: onur-ozkan * update logs Signed-off-by: onur-ozkan * rename leftovers Signed-off-by: onur-ozkan * update timing values Signed-off-by: onur-ozkan * minor fixes Signed-off-by: onur-ozkan * update pre dial check calls Signed-off-by: onur-ozkan * apply nit fixes Signed-off-by: onur-ozkan * don't update existing expiries Signed-off-by: onur-ozkan * revert breakage Signed-off-by: onur-ozkan --------- Signed-off-by: onur-ozkan --- Cargo.lock | 14 +- mm2src/common/Cargo.toml | 2 +- mm2src/mm2_main/src/lp_network.rs | 4 +- mm2src/mm2_main/src/lp_ordermatch.rs | 1 - mm2src/mm2_main/src/lp_stats.rs | 19 ++- mm2src/mm2_main/src/lp_swap.rs | 4 +- mm2src/mm2_p2p/Cargo.toml | 2 + .../request_response/network_info.rs | 2 + mm2src/mm2_p2p/src/behaviours/atomicdex.rs | 127 +++++++++++++++++- mm2src/mm2_p2p/src/behaviours/mod.rs | 34 ++--- .../mm2_p2p/src/behaviours/peers_exchange.rs | 11 +- 11 files changed, 178 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e69b25e20..60016ffc57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4250,6 +4250,7 @@ dependencies = [ "sha2 0.10.7", "smallvec 1.6.1", "syn 2.0.38", + "timed-map", "tokio", "void", ] @@ -5713,9 +5714,9 @@ checksum = "4c691c0e608126e00913e33f0ccf3727d5fc84573623b8d65b2df340b5201783" [[package]] name = "rustc-hash" -version = "1.1.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" [[package]] name = "rustc-hex" @@ -6955,6 +6956,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "timed-map" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b102d4d896895d697f1dff4141dff28307532dac57a376b2b5665a55b280dc6" +dependencies = [ + "rustc-hash", +] + [[package]] name = "tiny-keccak" version = "1.4.4" diff --git a/mm2src/common/Cargo.toml b/mm2src/common/Cargo.toml index b775741cf8..e9752134db 100644 --- a/mm2src/common/Cargo.toml +++ b/mm2src/common/Cargo.toml @@ -37,7 +37,7 @@ parking_lot_core = { version = "0.6", features = ["nightly"] } paste = "1.0" primitive-types = "0.11.1" rand = { version = "0.7", features = ["std", "small_rng"] } -rustc-hash = "1.1.0" +rustc-hash = "2.0" regex = "1" serde = "1" serde_derive = "1" diff --git a/mm2src/mm2_main/src/lp_network.rs b/mm2src/mm2_main/src/lp_network.rs index b2ef53f3fb..6ed8719fa5 100644 --- a/mm2src/mm2_main/src/lp_network.rs +++ b/mm2src/mm2_main/src/lp_network.rs @@ -238,9 +238,11 @@ fn process_p2p_request( response_channel: mm2_libp2p::AdexResponseChannel, ) -> P2PRequestResult<()> { let request = decode_message::(&request)?; + log::debug!("Got P2PRequest {:?}", request); + let result = match request { P2PRequest::Ordermatch(req) => lp_ordermatch::process_peer_request(ctx.clone(), req), - P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req), + P2PRequest::NetworkInfo(req) => lp_stats::process_info_request(ctx.clone(), req).map(Some), }; let res = match result { diff --git a/mm2src/mm2_main/src/lp_ordermatch.rs b/mm2src/mm2_main/src/lp_ordermatch.rs index 620cb79bfb..dba2139998 100644 --- a/mm2src/mm2_main/src/lp_ordermatch.rs +++ b/mm2src/mm2_main/src/lp_ordermatch.rs @@ -638,7 +638,6 @@ impl TryFromBytes for Uuid { } pub fn process_peer_request(ctx: MmArc, request: OrdermatchRequest) -> Result>, String> { - log::debug!("Got ordermatch request {:?}", request); match request { OrdermatchRequest::GetOrderbook { base, rel } => process_get_orderbook_request(ctx, base, rel), OrdermatchRequest::SyncPubkeyOrderbookState { pubkey, trie_roots } => { diff --git a/mm2src/mm2_main/src/lp_stats.rs b/mm2src/mm2_main/src/lp_stats.rs index 185996ecd1..3aedc1cb5c 100644 --- a/mm2src/mm2_main/src/lp_stats.rs +++ b/mm2src/mm2_main/src/lp_stats.rs @@ -11,6 +11,7 @@ use mm2_libp2p::application::request_response::network_info::NetworkInfoRequest; use mm2_libp2p::{encode_message, NetworkInfo, PeerId, RelayAddress, RelayAddressError}; use serde_json::{self as json, Value as Json}; use std::collections::{HashMap, HashSet}; +use std::convert::TryInto; use std::sync::Arc; use crate::lp_network::{add_reserved_peer_addresses, lp_network_ports, request_peers, NetIdError, ParseAddressError, @@ -170,16 +171,24 @@ struct Mm2VersionRes { nodes: HashMap, } -fn process_get_version_request(ctx: MmArc) -> Result>, String> { +fn process_get_version_request(ctx: MmArc) -> Result, String> { let response = ctx.mm_version().to_string(); - let encoded = try_s!(encode_message(&response)); - Ok(Some(encoded)) + encode_message(&response).map_err(|e| e.to_string()) } -pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result>, String> { - log::debug!("Got stats request {:?}", request); +fn process_get_peer_utc_timestamp_request() -> Result, String> { + let timestamp = common::get_utc_timestamp(); + let timestamp: u64 = timestamp + .try_into() + .unwrap_or_else(|_| panic!("`common::get_utc_timestamp` returned invalid data: {}", timestamp)); + + encode_message(×tamp).map_err(|e| e.to_string()) +} + +pub fn process_info_request(ctx: MmArc, request: NetworkInfoRequest) -> Result, String> { match request { NetworkInfoRequest::GetMm2Version => process_get_version_request(ctx), + NetworkInfoRequest::GetPeerUtcTimestamp => process_get_peer_utc_timestamp_request(), } } diff --git a/mm2src/mm2_main/src/lp_swap.rs b/mm2src/mm2_main/src/lp_swap.rs index 0acb7fc443..507b9a6f51 100644 --- a/mm2src/mm2_main/src/lp_swap.rs +++ b/mm2src/mm2_main/src/lp_swap.rs @@ -74,6 +74,7 @@ use derive_more::Display; use http::Response; use mm2_core::mm_ctx::{from_ctx, MmArc}; use mm2_err_handle::prelude::*; +use mm2_libp2p::behaviours::atomicdex::MAX_TIME_GAP_FOR_CONNECTED_PEER; use mm2_libp2p::{decode_signed, encode_and_sign, pub_sub_topic, PeerId, TopicPrefix}; use mm2_number::{BigDecimal, BigRational, MmNumber, MmNumberMultiRepr}; use mm2_state_machine::storable_state_machine::StateMachineStorage; @@ -155,12 +156,13 @@ pub(crate) const TAKER_SWAP_V2_TYPE: u8 = 2; pub(crate) const TAKER_FEE_VALIDATION_ATTEMPTS: usize = 6; pub(crate) const TAKER_FEE_VALIDATION_RETRY_DELAY_SECS: f64 = 10.; -const MAX_STARTED_AT_DIFF: u64 = 60; const NEGOTIATE_SEND_INTERVAL: f64 = 30.; /// If a certain P2P message is not received, swap will be aborted after this time expires. const NEGOTIATION_TIMEOUT_SEC: u64 = 90; +const MAX_STARTED_AT_DIFF: u64 = MAX_TIME_GAP_FOR_CONNECTED_PEER * 3; + cfg_wasm32! { use mm2_db::indexed_db::{ConstructibleDb, DbLocked}; use saved_swap::migrate_swaps_data; diff --git a/mm2src/mm2_p2p/Cargo.toml b/mm2src/mm2_p2p/Cargo.toml index 80465a254d..85efa47879 100644 --- a/mm2src/mm2_p2p/Cargo.toml +++ b/mm2src/mm2_p2p/Cargo.toml @@ -39,12 +39,14 @@ void = "1.0" futures-rustls = "0.24" instant = "0.1.12" libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.11", default-features = false, features = ["dns", "identify", "floodsub", "gossipsub", "noise", "ping", "request-response", "secp256k1", "tcp", "tokio", "websocket", "macros", "yamux"] } +timed-map = { version = "1.1.1", features = ["rustc-hash"] } tokio = { version = "1.20", default-features = false } [target.'cfg(target_arch = "wasm32")'.dependencies] futures-rustls = "0.22" instant = { version = "0.1.12", features = ["wasm-bindgen"] } libp2p = { git = "https://github.com/KomodoPlatform/rust-libp2p.git", tag = "k-0.52.11", default-features = false, features = ["identify", "floodsub", "noise", "gossipsub", "ping", "request-response", "secp256k1", "wasm-ext", "wasm-ext-websocket", "macros", "yamux"] } +timed-map = { version = "1.1.1", features = ["rustc-hash"] } [dev-dependencies] async-std = "1.6.2" diff --git a/mm2src/mm2_p2p/src/application/request_response/network_info.rs b/mm2src/mm2_p2p/src/application/request_response/network_info.rs index c8dece2ef5..4d610d932c 100644 --- a/mm2src/mm2_p2p/src/application/request_response/network_info.rs +++ b/mm2src/mm2_p2p/src/application/request_response/network_info.rs @@ -6,4 +6,6 @@ use serde::{Deserialize, Serialize}; pub enum NetworkInfoRequest { /// Get MM2 version of nodes added to stats collection GetMm2Version, + /// Get UTC timestamp in seconds from the target peer + GetPeerUtcTimestamp, } diff --git a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs index 9d58da4e1e..943ad08126 100644 --- a/mm2src/mm2_p2p/src/behaviours/atomicdex.rs +++ b/mm2src/mm2_p2p/src/behaviours/atomicdex.rs @@ -7,6 +7,7 @@ use futures::{channel::oneshot, use futures_rustls::rustls; use futures_ticker::Ticker; use instant::Duration; +use lazy_static::lazy_static; use libp2p::core::transport::Boxed as BoxedTransport; use libp2p::core::{ConnectedPoint, Endpoint}; use libp2p::floodsub::{Floodsub, FloodsubEvent, Topic as FloodsubTopic}; @@ -23,16 +24,20 @@ use std::collections::HashMap; use std::hash::{Hash, Hasher}; use std::iter; use std::net::IpAddr; +use std::sync::{Mutex, MutexGuard}; use std::task::{Context, Poll}; +use timed_map::{MapKind, StdClock, TimedMap}; use super::peers_exchange::{PeerAddresses, PeersExchange, PeersExchangeRequest, PeersExchangeResponse}; use super::ping::AdexPing; use super::request_response::{build_request_response_behaviour, PeerRequest, PeerResponse, RequestResponseBehaviour, RequestResponseSender}; +use crate::application::request_response::network_info::NetworkInfoRequest; +use crate::application::request_response::P2PRequest; use crate::network::{get_all_network_seednodes, DEFAULT_NETID}; use crate::relay_address::{RelayAddress, RelayAddressError}; use crate::swarm_runtime::SwarmRuntime; -use crate::{NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent}; +use crate::{decode_message, encode_message, NetworkInfo, NetworkPorts, RequestResponseBehaviourEvent}; pub use libp2p::gossipsub::{Behaviour as Gossipsub, IdentTopic, MessageAuthenticity, MessageId, Topic, TopicHash}; pub use libp2p::gossipsub::{ConfigBuilder as GossipsubConfigBuilder, Event as GossipsubEvent, @@ -50,6 +55,21 @@ const ANNOUNCE_INTERVAL: Duration = Duration::from_secs(600); const ANNOUNCE_INITIAL_DELAY: Duration = Duration::from_secs(60); const CHANNEL_BUF_SIZE: usize = 1024 * 8; +/// Used in time validation logic for each peer which runs immediately after the +/// `ConnectionEstablished` event. +/// +/// Be careful when updating this value, we have some defaults (like for swaps) +/// depending on this. +pub const MAX_TIME_GAP_FOR_CONNECTED_PEER: u64 = 20; + +/// Used for storing peers in [`RECENTLY_DIALED_PEERS`]. +const DIAL_RETRY_DELAY: Duration = Duration::from_secs(60 * 5); + +lazy_static! { + /// Tracks recently dialed peers to avoid repeated connection attempts. + static ref RECENTLY_DIALED_PEERS: Mutex> = Mutex::new(TimedMap::new_with_map_kind(MapKind::FxHashMap)); +} + pub const DEPRECATED_NETID_LIST: &[u16] = &[ 7777, // TODO: keep it inaccessible until Q2 of 2024. ]; @@ -162,6 +182,24 @@ pub enum AdexBehaviourCmd { }, } +/// Determines if a dial attempt to the remote should be made. +/// +/// Returns `false` if a dial attempt to the given address has already been made, +/// in which case the caller must skip the dial attempt. +fn check_and_mark_dialed( + recently_dialed_peers: &mut MutexGuard>, + addr: &Multiaddr, +) -> bool { + if recently_dialed_peers.get(addr).is_some() { + info!("Connection attempt was already made recently to '{addr}'."); + return false; + } + + recently_dialed_peers.insert_expirable_unchecked(addr.clone(), (), DIAL_RETRY_DELAY); + + true +} + /// Returns info about directly connected peers. pub async fn get_directly_connected_peers(mut cmd_tx: AdexCmdTx) -> HashMap> { let (result_tx, rx) = oneshot::channel(); @@ -199,6 +237,46 @@ pub async fn get_relay_mesh(mut cmd_tx: AdexCmdTx) -> Vec { rx.await.expect("Tx should be present") } +async fn validate_peer_time(peer: PeerId, mut response_tx: Sender>, rp_sender: RequestResponseSender) { + let request = P2PRequest::NetworkInfo(NetworkInfoRequest::GetPeerUtcTimestamp); + let encoded_request = encode_message(&request) + .expect("Static type `PeerInfoRequest::GetPeerUtcTimestamp` should never fail in serialization."); + + match request_one_peer(peer, encoded_request, rp_sender).await { + PeerResponse::Ok { res } => { + if let Ok(timestamp) = decode_message::(&res) { + let now = common::get_utc_timestamp(); + let now: u64 = now + .try_into() + .unwrap_or_else(|_| panic!("`common::get_utc_timestamp` returned invalid data: {}", now)); + + let diff = now.abs_diff(timestamp); + + // If time diff is in the acceptable gap, end the validation here. + if diff <= MAX_TIME_GAP_FOR_CONNECTED_PEER { + debug!( + "Peer '{peer}' is within the acceptable time gap ({MAX_TIME_GAP_FOR_CONNECTED_PEER} seconds); time difference is {diff} seconds." + ); + response_tx.send(None).await.unwrap(); + return; + } + }; + }, + other => { + error!("Unexpected response `{other:?}` from peer `{peer}`"); + // TODO: Ideally, we should send `Some(peer)` to end the connection, + // but we don't want to cause a breaking change yet. + response_tx.send(None).await.unwrap(); + return; + }, + } + + // If the function reaches this point, this means validation has failed. + // Send the peer ID to disconnect from it. + error!("Failed to validate the time for peer `{peer}`; disconnecting."); + response_tx.send(Some(peer)).await.unwrap(); +} + async fn request_one_peer(peer: PeerId, req: Vec, mut request_response_tx: RequestResponseSender) -> PeerResponse { // Use the internal receiver to receive a response to this request. let (internal_response_tx, internal_response_rx) = oneshot::channel(); @@ -711,12 +789,18 @@ fn start_gossipsub( _ => (), } + let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); for relay in bootstrap.choose_multiple(&mut rng, mesh_n) { + if !check_and_mark_dialed(&mut recently_dialed_peers, relay) { + continue; + } + match libp2p::Swarm::dial(&mut swarm, relay.clone()) { Ok(_) => info!("Dialed {}", relay), Err(e) => error!("Dial {:?} failed: {:?}", relay, e), } } + drop(recently_dialed_peers); let mut check_connected_relays_interval = Ticker::new_with_next(CONNECTED_RELAYS_CHECK_INTERVAL, CONNECTED_RELAYS_CHECK_INTERVAL); @@ -724,6 +808,7 @@ fn start_gossipsub( let mut announce_interval = Ticker::new_with_next(ANNOUNCE_INTERVAL, ANNOUNCE_INITIAL_DELAY); let mut listening = false; + let (timestamp_tx, mut timestamp_rx) = futures::channel::mpsc::channel(mesh_n_high); let polling_fut = poll_fn(move |cx: &mut Context| { loop { match swarm.behaviour_mut().cmd_rx.poll_next_unpin(cx) { @@ -733,11 +818,27 @@ fn start_gossipsub( } } + while let Poll::Ready(Some(Some(peer_id))) = timestamp_rx.poll_next_unpin(cx) { + if swarm.disconnect_peer_id(peer_id).is_err() { + error!("Disconnection from `{peer_id}` failed unexpectedly, which should never happen."); + } + } + loop { match swarm.poll_next_unpin(cx) { Poll::Ready(Some(event)) => { debug!("Swarm event {:?}", event); + if let SwarmEvent::ConnectionEstablished { peer_id, .. } = &event { + info!("Validating time data for peer `{peer_id}`."); + let future = validate_peer_time( + *peer_id, + timestamp_tx.clone(), + swarm.behaviour().core.request_response.sender(), + ); + swarm.behaviour().spawn(future); + } + if let SwarmEvent::Behaviour(event) = event { if swarm.behaviour_mut().netid != DEFAULT_NETID { if let AdexBehaviourEvent::Floodsub(FloodsubEvent::Message(message)) = &event { @@ -798,19 +899,29 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses let mut rng = rand::thread_rng(); if connected_relays.len() < mesh_n_low { + let mut recently_dialed_peers = RECENTLY_DIALED_PEERS.lock().unwrap(); let to_connect_num = mesh_n - connected_relays.len(); - let to_connect = swarm - .behaviour_mut() - .core - .peers_exchange - .get_random_peers(to_connect_num, |peer| !connected_relays.contains(peer)); + let to_connect = + swarm + .behaviour_mut() + .core + .peers_exchange + .get_random_peers(to_connect_num, |peer, addresses| { + !connected_relays.contains(peer) + && addresses + .iter() + .any(|addr| check_and_mark_dialed(&mut recently_dialed_peers, addr)) + }); // choose some random bootstrap addresses to connect if peers exchange returned not enough peers if to_connect.len() < to_connect_num { let connect_bootstrap_num = to_connect_num - to_connect.len(); for addr in bootstrap_addresses .iter() - .filter(|addr| !swarm.behaviour().core.gossipsub.is_connected_to_addr(addr)) + .filter(|addr| { + !swarm.behaviour().core.gossipsub.is_connected_to_addr(addr) + && check_and_mark_dialed(&mut recently_dialed_peers, addr) + }) .collect::>() .choose_multiple(&mut rng, connect_bootstrap_num) { @@ -824,11 +935,13 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses if swarm.behaviour().core.gossipsub.is_connected_to_addr(&addr) { continue; } + if let Err(e) = libp2p::Swarm::dial(swarm, addr.clone()) { error!("Peer {} address {} dial error {}", peer, addr, e); } } } + drop(recently_dialed_peers); } if connected_relays.len() > max_n { diff --git a/mm2src/mm2_p2p/src/behaviours/mod.rs b/mm2src/mm2_p2p/src/behaviours/mod.rs index cdfda38c8d..36436efe4e 100644 --- a/mm2src/mm2_p2p/src/behaviours/mod.rs +++ b/mm2src/mm2_p2p/src/behaviours/mod.rs @@ -106,17 +106,16 @@ mod tests { let node1_port = next_port(); let node1 = Node::spawn(node1_port, vec![], move |mut cmd_tx, event| { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; request_received_cpy.store(true, Ordering::Relaxed); - assert_eq!(request, b"test request"); let res = AdexResponse::Ok { response: b"test response".to_vec(), @@ -157,19 +156,17 @@ mod tests { impl RequestHandler { fn handle(&mut self, mut cmd_tx: mpsc::Sender, event: AdexBehaviourEvent) { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; self.requests += 1; - assert_eq!(request, b"test request"); - // the first time we should respond the none if self.requests == 1 { let res = AdexResponse::None; @@ -249,17 +246,16 @@ mod tests { let node1_port = next_port(); let _node1 = Node::spawn(node1_port, vec![], move |mut cmd_tx, event| { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; request_received_cpy.store(true, Ordering::Relaxed); - assert_eq!(request, b"test request"); let res = AdexResponse::None; cmd_tx @@ -293,17 +289,15 @@ mod tests { let receiver1_port = next_port(); let receiver1 = Node::spawn(receiver1_port, vec![], move |mut cmd_tx, event| { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; - assert_eq!(request, b"test request"); - let res = AdexResponse::None; cmd_tx .try_send(AdexBehaviourCmd::SendResponse { res, response_channel }) @@ -313,17 +307,15 @@ mod tests { let receiver2_port = next_port(); let receiver2 = Node::spawn(receiver2_port, vec![], move |mut cmd_tx, event| { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; - assert_eq!(request, b"test request"); - let res = AdexResponse::Err { error: "test error".into(), }; @@ -335,17 +327,15 @@ mod tests { let receiver3_port = next_port(); let receiver3 = Node::spawn(receiver3_port, vec![], move |mut cmd_tx, event| { - let (request, response_channel) = match event { + let response_channel = match event { AdexBehaviourEvent::RequestResponse(RequestResponseBehaviourEvent::InboundRequest { request, response_channel, .. - }) => (request.req, AdexResponseChannel(response_channel)), + }) if request.req == b"test request" => AdexResponseChannel(response_channel), _ => return, }; - assert_eq!(request, b"test request"); - let res = AdexResponse::Ok { response: b"test response".to_vec(), }; diff --git a/mm2src/mm2_p2p/src/behaviours/peers_exchange.rs b/mm2src/mm2_p2p/src/behaviours/peers_exchange.rs index 412fa16355..1bede91995 100644 --- a/mm2src/mm2_p2p/src/behaviours/peers_exchange.rs +++ b/mm2src/mm2_p2p/src/behaviours/peers_exchange.rs @@ -330,11 +330,18 @@ impl PeersExchange { pub fn get_random_peers( &mut self, num: usize, - mut filter: impl FnMut(&PeerId) -> bool, + mut filter: impl FnMut(&PeerId, HashSet) -> bool, ) -> HashMap { let mut result = HashMap::with_capacity(num); let mut rng = rand::thread_rng(); - let peer_ids = self.known_peers.iter().filter(|peer| filter(peer)).collect::>(); + let peer_ids = self + .known_peers + .iter() + .filter(|peer| { + let addresses = self.request_response.addresses_of_peer(peer).into_iter().collect(); + filter(peer, addresses) + }) + .collect::>(); for peer_id in peer_ids.choose_multiple(&mut rng, num) { let addresses = self.request_response.addresses_of_peer(peer_id).into_iter().collect();