Skip to content

Commit

Permalink
Do not connect to already connected peer from peers_exchange
Browse files Browse the repository at this point in the history
* Ignore qrc20_tests::test_search_for_swap_tx_spend
  • Loading branch information
sergeyboyko0791 committed Dec 3, 2020
1 parent 6dad1bb commit 3de21d9
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 13 deletions.
1 change: 1 addition & 0 deletions mm2src/coins/qrc20/qrc20_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ fn test_validate_fee() {
}

#[test]
#[ignore]
fn test_search_for_swap_tx_spend() {
// priv_key of qXxsj5RtciAby9T7m98AgAATL4zTi4UwDG
let priv_key = [
Expand Down
13 changes: 8 additions & 5 deletions mm2src/mm2_libp2p/src/atomicdex_behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{adex_ping::AdexPing,
peers_exchange::PeersExchange,
peers_exchange::{PeerAddresses, PeersExchange},
request_response::{build_request_response_behaviour, PeerRequest, PeerResponse, RequestResponseBehaviour,
RequestResponseBehaviourEvent, RequestResponseSender}};
use atomicdex_gossipsub::{Gossipsub, GossipsubConfigBuilder, GossipsubEvent, GossipsubMessage, MessageId, Topic,
Expand Down Expand Up @@ -381,8 +381,8 @@ impl AtomicDexBehaviour {
}
}

fn announce_listeners(&mut self, listeners: Vec<Multiaddr>) {
let serialized = rmp_serde::to_vec(&listeners).expect("Vec<Multiaddr> serialization should never fail");
fn announce_listeners(&mut self, listeners: PeerAddresses) {
let serialized = rmp_serde::to_vec(&listeners).expect("PeerAddresses serialization should never fail");
self.floodsub.publish(FloodsubTopic::new(PEERS_TOPIC), serialized);
}

Expand All @@ -400,7 +400,7 @@ impl NetworkBehaviourEventProcess<FloodsubEvent> for AtomicDexBehaviour {
if let FloodsubEvent::Message(message) = &event {
for topic in &message.topics {
if topic == &FloodsubTopic::new(PEERS_TOPIC) {
let addresses: Vec<Multiaddr> = match rmp_serde::from_read_ref(&message.data) {
let addresses: PeerAddresses = match rmp_serde::from_read_ref(&message.data) {
Ok(a) => a,
Err(_) => return,
};
Expand Down Expand Up @@ -476,6 +476,9 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses
}
for (peer, addresses) in to_connect {
for addr in addresses {
if swarm.gossipsub.is_connected_to_addr(&addr) {
continue;
}
if let Err(e) = libp2p::Swarm::dial_addr(swarm, addr.clone()) {
error!("Peer {} address {} dial error {}", peer, addr, e);
}
Expand Down Expand Up @@ -507,7 +510,7 @@ fn maintain_connection_to_relays(swarm: &mut AtomicDexSwarm, bootstrap_addresses
}

fn announce_my_addresses(swarm: &mut AtomicDexSwarm) {
let global_listeners: Vec<_> = Swarm::listeners(&swarm)
let global_listeners: PeerAddresses = Swarm::listeners(&swarm)
.filter(|listener| {
for protocol in listener.iter() {
if let Protocol::Ip4(ip) = protocol {
Expand Down
17 changes: 9 additions & 8 deletions mm2src/mm2_libp2p/src/peers_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ use libp2p::{multiaddr::Multiaddr,
use log::{error, info};
use rand::{seq::SliceRandom, thread_rng};
use serde::{de::Deserializer, ser::Serializer, Deserialize, Serialize};
use std::collections::HashSet;
use std::{collections::{HashMap, VecDeque},
iter,
task::{Context, Poll},
time::Duration};
use wasm_timer::{Instant, Interval};

pub type PeerAddresses = HashSet<Multiaddr>;

#[derive(Debug, Clone)]
pub enum PeersExchangeProtocol {
Version1,
Expand Down Expand Up @@ -62,9 +65,7 @@ pub enum PeersExchangeRequest {

#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum PeersExchangeResponse {
KnownPeers {
peers: HashMap<PeerIdSerde, Vec<Multiaddr>>,
},
KnownPeers { peers: HashMap<PeerIdSerde, PeerAddresses> },
}

/// Behaviour that requests known peers list from other peers at random
Expand Down Expand Up @@ -98,12 +99,12 @@ impl PeersExchange {
}
}

fn get_random_known_peers(&mut self, num: usize) -> HashMap<PeerIdSerde, Vec<Multiaddr>> {
fn get_random_known_peers(&mut self, num: usize) -> HashMap<PeerIdSerde, PeerAddresses> {
let mut result = HashMap::with_capacity(num);
let mut rng = thread_rng();
let peer_ids = self.known_peers.choose_multiple(&mut rng, num).cloned();
for peer_id in peer_ids {
let addresses = self.request_response.addresses_of_peer(&peer_id);
let addresses = self.request_response.addresses_of_peer(&peer_id).into_iter().collect();
result.insert(peer_id.into(), addresses);
}
result
Expand All @@ -121,7 +122,7 @@ impl PeersExchange {
}
}

pub fn add_peer_addresses(&mut self, peer: &PeerId, addresses: Vec<Multiaddr>) {
pub fn add_peer_addresses(&mut self, peer: &PeerId, addresses: PeerAddresses) {
if !self.known_peers.contains(&peer) && !addresses.is_empty() {
self.known_peers.push(peer.clone());
}
Expand Down Expand Up @@ -160,12 +161,12 @@ impl PeersExchange {
&mut self,
num: usize,
mut filter: impl FnMut(&PeerId) -> bool,
) -> HashMap<PeerId, Vec<Multiaddr>> {
) -> HashMap<PeerId, PeerAddresses> {
let mut result = HashMap::with_capacity(num);
let mut rng = thread_rng();
let peer_ids = self.known_peers.iter().filter(|peer| filter(*peer)).collect::<Vec<_>>();
for peer_id in peer_ids.choose_multiple(&mut rng, num) {
let addresses = self.request_response.addresses_of_peer(*peer_id);
let addresses = self.request_response.addresses_of_peer(*peer_id).into_iter().collect();
result.insert((*peer_id).clone(), addresses);
}
result
Expand Down

0 comments on commit 3de21d9

Please sign in to comment.