Skip to content

Commit 77c538b

Browse files
committed
Add ConnectionManager handling pending connections
Previously, concurrent calls to `do_connect_peer`/`connect_peer_if_necessary` could result in multiple connections being opened, just to be closed as redundant shortly after. Parallel connection attempt were therefore prone to fail. Here, we introduce a `ConnectionManager` that implements a pub/sub pattern: upon the initial call, the task responsible for polling the connection future to completion registers that a connection is in-flight. Any calls following will check this and register a `oneshot` channel to be notified of the `Result`.
1 parent 2d9fe95 commit 77c538b

File tree

4 files changed

+139
-59
lines changed

4 files changed

+139
-59
lines changed

src/builder.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::config::{
22
Config, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, DEFAULT_ESPLORA_SERVER_URL,
33
WALLET_KEYS_SEED_LEN,
44
};
5+
use crate::connection::ConnectionManager;
56
use crate::event::EventQueue;
67
use crate::fee_estimator::OnchainFeeEstimator;
78
use crate::gossip::GossipSource;
@@ -895,6 +896,9 @@ fn build_with_store_internal(
895896

896897
liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager)));
897898

899+
let connection_manager =
900+
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));
901+
898902
let output_sweeper = match io::utils::read_output_sweeper(
899903
Arc::clone(&tx_broadcaster),
900904
Arc::clone(&fee_estimator),
@@ -991,6 +995,7 @@ fn build_with_store_internal(
991995
chain_monitor,
992996
output_sweeper,
993997
peer_manager,
998+
connection_manager,
994999
keys_manager,
9951000
network_graph,
9961001
gossip_source,

src/connection.rs

Lines changed: 123 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,64 +6,142 @@ use lightning::ln::msgs::SocketAddress;
66

77
use bitcoin::secp256k1::PublicKey;
88

9+
use std::collections::hash_map::{self, HashMap};
910
use std::net::ToSocketAddrs;
1011
use std::ops::Deref;
11-
use std::sync::Arc;
12+
use std::sync::{Arc, Mutex};
1213
use std::time::Duration;
1314

14-
pub(crate) async fn connect_peer_if_necessary<L: Deref + Clone + Sync + Send>(
15-
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager>, logger: L,
16-
) -> Result<(), Error>
15+
pub(crate) struct ConnectionManager<L: Deref + Clone + Sync + Send>
1716
where
1817
L::Target: Logger,
1918
{
20-
if peer_manager.peer_by_node_id(&node_id).is_some() {
21-
return Ok(());
22-
}
23-
24-
do_connect_peer(node_id, addr, peer_manager, logger).await
19+
pending_connections:
20+
Mutex<HashMap<PublicKey, Vec<tokio::sync::oneshot::Sender<Result<(), Error>>>>>,
21+
peer_manager: Arc<PeerManager>,
22+
logger: L,
2523
}
2624

27-
pub(crate) async fn do_connect_peer<L: Deref + Clone + Sync + Send>(
28-
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager>, logger: L,
29-
) -> Result<(), Error>
25+
impl<L: Deref + Clone + Sync + Send> ConnectionManager<L>
3026
where
3127
L::Target: Logger,
3228
{
33-
log_info!(logger, "Connecting to peer: {}@{}", node_id, addr);
34-
35-
let socket_addr = addr
36-
.to_socket_addrs()
37-
.map_err(|e| {
38-
log_error!(logger, "Failed to resolve network address: {}", e);
39-
Error::InvalidSocketAddress
40-
})?
41-
.next()
42-
.ok_or(Error::ConnectionFailed)?;
43-
44-
match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), node_id, socket_addr)
45-
.await
46-
{
47-
Some(connection_closed_future) => {
48-
let mut connection_closed_future = Box::pin(connection_closed_future);
49-
loop {
50-
tokio::select! {
51-
_ = &mut connection_closed_future => {
52-
log_info!(logger, "Peer connection closed: {}@{}", node_id, addr);
53-
return Err(Error::ConnectionFailed);
54-
},
55-
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
56-
};
57-
58-
match peer_manager.peer_by_node_id(&node_id) {
59-
Some(_) => return Ok(()),
60-
None => continue,
29+
pub(crate) fn new(peer_manager: Arc<PeerManager>, logger: L) -> Self {
30+
let pending_connections = Mutex::new(HashMap::new());
31+
Self { pending_connections, peer_manager, logger }
32+
}
33+
34+
pub(crate) async fn connect_peer_if_necessary(
35+
&self, node_id: PublicKey, addr: SocketAddress,
36+
) -> Result<(), Error> {
37+
if self.peer_manager.peer_by_node_id(&node_id).is_some() {
38+
return Ok(());
39+
}
40+
41+
self.do_connect_peer(node_id, addr).await
42+
}
43+
44+
pub(crate) async fn do_connect_peer(
45+
&self, node_id: PublicKey, addr: SocketAddress,
46+
) -> Result<(), Error> {
47+
// First, we check if there is already an outbound connection in flight, if so, we just
48+
// await on the corresponding watch channel. The task driving the connection future will
49+
// send us the result..
50+
let pending_ready_receiver_opt = self.register_or_subscribe_pending_connection(&node_id);
51+
if let Some(pending_connection_ready_receiver) = pending_ready_receiver_opt {
52+
return pending_connection_ready_receiver.await.map_err(|e| {
53+
debug_assert!(false, "Failed to receive connection result: {:?}", e);
54+
log_error!(self.logger, "Failed to receive connection result: {:?}", e);
55+
Error::ConnectionFailed
56+
})?;
57+
}
58+
59+
log_info!(self.logger, "Connecting to peer: {}@{}", node_id, addr);
60+
61+
let socket_addr = addr
62+
.to_socket_addrs()
63+
.map_err(|e| {
64+
log_error!(self.logger, "Failed to resolve network address {}: {}", addr, e);
65+
self.propagate_result_to_subscribers(&node_id, Err(Error::InvalidSocketAddress));
66+
Error::InvalidSocketAddress
67+
})?
68+
.next()
69+
.ok_or_else(|| {
70+
log_error!(self.logger, "Failed to resolve network address {}", addr);
71+
self.propagate_result_to_subscribers(&node_id, Err(Error::InvalidSocketAddress));
72+
Error::InvalidSocketAddress
73+
})?;
74+
75+
let connection_future = lightning_net_tokio::connect_outbound(
76+
Arc::clone(&self.peer_manager),
77+
node_id,
78+
socket_addr,
79+
);
80+
81+
let res = match connection_future.await {
82+
Some(connection_closed_future) => {
83+
let mut connection_closed_future = Box::pin(connection_closed_future);
84+
loop {
85+
tokio::select! {
86+
_ = &mut connection_closed_future => {
87+
log_info!(self.logger, "Peer connection closed: {}@{}", node_id, addr);
88+
break Err(Error::ConnectionFailed);
89+
},
90+
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
91+
};
92+
93+
match self.peer_manager.peer_by_node_id(&node_id) {
94+
Some(_) => break Ok(()),
95+
None => continue,
96+
}
6197
}
98+
},
99+
None => {
100+
log_error!(self.logger, "Failed to connect to peer: {}@{}", node_id, addr);
101+
Err(Error::ConnectionFailed)
102+
},
103+
};
104+
105+
self.propagate_result_to_subscribers(&node_id, res);
106+
107+
res
108+
}
109+
110+
fn register_or_subscribe_pending_connection(
111+
&self, node_id: &PublicKey,
112+
) -> Option<tokio::sync::oneshot::Receiver<Result<(), Error>>> {
113+
let mut pending_connections_lock = self.pending_connections.lock().unwrap();
114+
match pending_connections_lock.entry(*node_id) {
115+
hash_map::Entry::Occupied(mut entry) => {
116+
let (tx, rx) = tokio::sync::oneshot::channel();
117+
entry.get_mut().push(tx);
118+
Some(rx)
119+
},
120+
hash_map::Entry::Vacant(entry) => {
121+
entry.insert(Vec::new());
122+
None
123+
},
124+
}
125+
}
126+
127+
fn propagate_result_to_subscribers(&self, node_id: &PublicKey, res: Result<(), Error>) {
128+
// Send the result to any other tasks that might be waiting on it by now.
129+
let mut pending_connections_lock = self.pending_connections.lock().unwrap();
130+
if let Some(connection_ready_senders) = pending_connections_lock.remove(node_id) {
131+
for sender in connection_ready_senders {
132+
let _ = sender.send(res).map_err(|e| {
133+
debug_assert!(
134+
false,
135+
"Failed to send connection result to subscribers: {:?}",
136+
e
137+
);
138+
log_error!(
139+
self.logger,
140+
"Failed to send connection result to subscribers: {:?}",
141+
e
142+
);
143+
});
62144
}
63-
},
64-
None => {
65-
log_error!(logger, "Failed to connect to peer: {}@{}", node_id, addr);
66-
Err(Error::ConnectionFailed)
67-
},
145+
}
68146
}
69147
}

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::fmt;
22

3-
#[derive(Debug, PartialEq, Eq)]
3+
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
44
/// An error that possibly needs to be handled by the user.
55
pub enum Error {
66
/// Returned when trying to start [`crate::Node`] while it is already running.

src/lib.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ use config::{
125125
LDK_PAYMENT_RETRY_TIMEOUT, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL,
126126
RGS_SYNC_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS,
127127
};
128-
use connection::{connect_peer_if_necessary, do_connect_peer};
128+
use connection::ConnectionManager;
129129
use event::{EventHandler, EventQueue};
130130
use gossip::GossipSource;
131131
use liquidity::LiquiditySource;
@@ -189,6 +189,7 @@ pub struct Node {
189189
chain_monitor: Arc<ChainMonitor>,
190190
output_sweeper: Arc<Sweeper>,
191191
peer_manager: Arc<PeerManager>,
192+
connection_manager: Arc<ConnectionManager<Arc<FilesystemLogger>>>,
192193
keys_manager: Arc<KeysManager>,
193194
network_graph: Arc<NetworkGraph>,
194195
gossip_source: Arc<GossipSource>,
@@ -500,6 +501,7 @@ impl Node {
500501
}
501502

502503
// Regularly reconnect to persisted peers.
504+
let connect_cm = Arc::clone(&self.connection_manager);
503505
let connect_pm = Arc::clone(&self.peer_manager);
504506
let connect_logger = Arc::clone(&self.logger);
505507
let connect_peer_store = Arc::clone(&self.peer_store);
@@ -520,11 +522,9 @@ impl Node {
520522
.collect::<Vec<_>>();
521523

522524
for peer_info in connect_peer_store.list_peers().iter().filter(|info| !pm_peers.contains(&info.node_id)) {
523-
let res = do_connect_peer(
525+
let res = connect_cm.do_connect_peer(
524526
peer_info.node_id,
525527
peer_info.address.clone(),
526-
Arc::clone(&connect_pm),
527-
Arc::clone(&connect_logger),
528528
).await;
529529
match res {
530530
Ok(_) => {
@@ -873,14 +873,13 @@ impl Node {
873873

874874
let con_node_id = peer_info.node_id;
875875
let con_addr = peer_info.address.clone();
876-
let con_logger = Arc::clone(&self.logger);
877-
let con_pm = Arc::clone(&self.peer_manager);
876+
let con_cm = Arc::clone(&self.connection_manager);
878877

879878
// We need to use our main runtime here as a local runtime might not be around to poll
880879
// connection futures going forward.
881880
tokio::task::block_in_place(move || {
882881
runtime.block_on(async move {
883-
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
882+
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
884883
})
885884
})?;
886885

@@ -946,14 +945,13 @@ impl Node {
946945

947946
let con_node_id = peer_info.node_id;
948947
let con_addr = peer_info.address.clone();
949-
let con_logger = Arc::clone(&self.logger);
950-
let con_pm = Arc::clone(&self.peer_manager);
948+
let con_cm = Arc::clone(&self.connection_manager);
951949

952950
// We need to use our main runtime here as a local runtime might not be around to poll
953951
// connection futures going forward.
954952
tokio::task::block_in_place(move || {
955953
runtime.block_on(async move {
956-
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
954+
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
957955
})
958956
})?;
959957

@@ -1603,14 +1601,13 @@ impl Node {
16031601

16041602
let con_node_id = peer_info.node_id;
16051603
let con_addr = peer_info.address.clone();
1606-
let con_logger = Arc::clone(&self.logger);
1607-
let con_pm = Arc::clone(&self.peer_manager);
1604+
let con_cm = Arc::clone(&self.connection_manager);
16081605

16091606
// We need to use our main runtime here as a local runtime might not be around to poll
16101607
// connection futures going forward.
16111608
tokio::task::block_in_place(move || {
16121609
runtime.block_on(async move {
1613-
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
1610+
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
16141611
})
16151612
})?;
16161613

0 commit comments

Comments
 (0)