Skip to content

Commit

Permalink
feat: redial node_addresses at an interval on connection close
Browse files Browse the repository at this point in the history
  • Loading branch information
QuinnWilton committed Jan 27, 2024
1 parent 0d778d3 commit a6202d9
Show file tree
Hide file tree
Showing 7 changed files with 471 additions and 0 deletions.
1 change: 1 addition & 0 deletions homestar-runtime/config/defaults.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ announce_addresses = []
transport_connection_timeout = 60
max_connected_peers = 32
max_announce_addresses = 10
dial_interval = 30

[node.network.libp2p.mdns]
enable = true
Expand Down
3 changes: 3 additions & 0 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ struct Rendezvous {

// Connected peers configuration and state
struct Connections {
dial_interval: Duration,
peers: FnvHashMap<PeerId, ConnectedPoint>,
max_peers: u32,
}
Expand Down Expand Up @@ -190,6 +191,7 @@ where
query_senders: FnvHashMap::default(),
request_response_senders: FnvHashMap::default(),
connections: Connections {
dial_interval: settings.libp2p.dial_interval,
peers: FnvHashMap::default(),
max_peers: settings.libp2p.max_connected_peers,
},
Expand Down Expand Up @@ -231,6 +233,7 @@ where
query_senders: FnvHashMap::default(),
request_response_senders: FnvHashMap::default(),
connections: Connections {
dial_interval: settings.libp2p.dial_interval,
peers: FnvHashMap::default(),
max_peers: settings.libp2p.max_connected_peers,
},
Expand Down
6 changes: 6 additions & 0 deletions homestar-runtime/src/event_handler/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub(crate) enum CacheData {
pub(crate) enum DispatchEvent {
RegisterPeer,
DiscoverPeers,
DialPeer,
}

/// Setup a cache with an eviction listener.
Expand Down Expand Up @@ -78,6 +79,11 @@ pub(crate) fn setup_cache(
let _ = tx.send(Event::DiscoverPeers(rendezvous_node.to_owned()));
};
}
DispatchEvent::DialPeer => {
if let Some(CacheData::Peer(node)) = val.data.get("node") {
let _ = tx.send(Event::DialPeer(node.to_owned()));
};
}
}
}
};
Expand Down
8 changes: 8 additions & 0 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ pub(crate) enum Event {
DiscoverPeers(PeerId),
/// Dynamically get listeners for the swarm.
GetNodeInfo(AsyncChannelSender<DynamicNodeInfo>),
/// Dial a peer.
DialPeer(PeerId),
}

#[allow(unreachable_patterns)]
Expand Down Expand Up @@ -293,6 +295,12 @@ impl Event {
);
}
}
Event::DialPeer(peer_id) => {
event_handler
.swarm
.dial(peer_id)
.map_err(anyhow::Error::new)?;
}
_ => {}
}
Ok(())
Expand Down
70 changes: 70 additions & 0 deletions homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,32 @@ async fn handle_swarm_event<DB: Database>(
peer_id = peer_id.to_string(),
"removed peer from kademlia table"
);
} else {
debug!(
subject = "libp2p.conn.closed",
category = "handle_swarm_event",
peer_id = peer_id.to_string(),
"redialing trusted peer in {interval:?}",
interval = event_handler.connections.dial_interval
);

// Dial peers again at dial interval
event_handler
.cache
.insert(
format!("{}-dial", peer_id),
CacheValue::new(
event_handler.connections.dial_interval,
HashMap::from([
(
"on_expiration".to_string(),
CacheData::OnExpiration(cache::DispatchEvent::DialPeer),
),
("node".to_string(), CacheData::Peer(peer_id)),
]),
),
)
.await;
}

#[cfg(feature = "websocket-notify")]
Expand All @@ -1182,6 +1208,50 @@ async fn handle_swarm_event<DB: Database>(
"outgoing connection error"
);

// Redial peer if in configured peers
if let Some(peer_id) = peer_id {
if event_handler.node_addresses.iter().any(|multiaddr| {
if let Some(id) = multiaddr.peer_id() {
id == peer_id
} else {
// TODO: We may want to check the multiadress without relying on
// the peer ID. This would give more flexibility when configuring nodes.
warn!(
subject = "libp2p.outgoing.err",
category = "handle_swarm_event",
"Configured peer must include a peer ID: {multiaddr}"
);
false
}
}) {
debug!(
subject = "libp2p.outgoing.err",
category = "handle_swarm_event",
peer_id = peer_id.to_string(),
"redialing trusted peer in {interval:?}",
interval = event_handler.connections.dial_interval
);

// Dial peers again at dial interval
event_handler
.cache
.insert(
format!("{}-dial", peer_id),
CacheValue::new(
event_handler.connections.dial_interval,
HashMap::from([
(
"on_expiration".to_string(),
CacheData::OnExpiration(cache::DispatchEvent::DialPeer),
),
("node".to_string(), CacheData::Peer(peer_id)),
]),
),
)
.await;
}
}

#[cfg(feature = "websocket-notify")]
notification::emit_event(
event_handler.ws_evt_sender(),
Expand Down
4 changes: 4 additions & 0 deletions homestar-runtime/src/settings/libp2p_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub(crate) struct Libp2p {
/// Transport connection timeout.
#[serde_as(as = "DurationSeconds<u64>")]
pub(crate) transport_connection_timeout: Duration,
/// Dial interval.
#[serde_as(as = "DurationSeconds<u64>")]
pub(crate) dial_interval: Duration,
}

/// DHT settings.
Expand Down Expand Up @@ -138,6 +141,7 @@ impl Default for Libp2p {
pubsub: Pubsub::default(),
rendezvous: Rendezvous::default(),
transport_connection_timeout: Duration::new(60, 0),
dial_interval: Duration::new(30, 0),
}
}
}
Expand Down
Loading

0 comments on commit a6202d9

Please sign in to comment.