Skip to content

Commit 7b4b7d5

Browse files
committed
Add ConnectionManager handling pending connections
Previously, concurrent calls to `do_connect_peer`/`connect_peer_if_necessary` could result in multiple connections being opened, just to be closed as redundant shortly after. Parallel connection attempt were therefore prone to fail. Here, we introduce a `ConnectionManager` that implements a pub/sub pattern: upon the initial call, the task is responsible for polling the connection future to completion registers that a connection is in-flight. Any calls following will check this and register a `oneshot` channel to be notified of the `Result`.
1 parent d03c65f commit 7b4b7d5

File tree

4 files changed

+140
-59
lines changed

4 files changed

+140
-59
lines changed

src/builder.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::config::{
22
Config, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, DEFAULT_ESPLORA_SERVER_URL,
33
WALLET_KEYS_SEED_LEN,
44
};
5+
use crate::connection::ConnectionManager;
56
use crate::event::EventQueue;
67
use crate::fee_estimator::OnchainFeeEstimator;
78
use crate::gossip::GossipSource;
@@ -891,6 +892,9 @@ fn build_with_store_internal(
891892

892893
liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager)));
893894

895+
let connection_manager =
896+
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));
897+
894898
// Init payment info storage
895899
let payment_store = match io::utils::read_payments(Arc::clone(&kv_store), Arc::clone(&logger)) {
896900
Ok(payments) => {
@@ -958,6 +962,7 @@ fn build_with_store_internal(
958962
chain_monitor,
959963
output_sweeper,
960964
peer_manager,
965+
connection_manager,
961966
keys_manager,
962967
network_graph,
963968
gossip_source,

src/connection.rs

Lines changed: 124 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -8,62 +8,141 @@ use bitcoin::secp256k1::PublicKey;
88

99
use std::net::ToSocketAddrs;
1010
use std::ops::Deref;
11-
use std::sync::Arc;
11+
use std::sync::{Arc, Mutex};
1212
use std::time::Duration;
1313

14-
pub(crate) async fn connect_peer_if_necessary<L: Deref + Clone + Sync + Send>(
15-
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager>, logger: L,
16-
) -> Result<(), Error>
14+
pub(crate) struct ConnectionManager<L: Deref + Clone + Sync + Send>
1715
where
1816
L::Target: Logger,
1917
{
20-
if peer_manager.peer_by_node_id(&node_id).is_some() {
21-
return Ok(());
22-
}
23-
24-
do_connect_peer(node_id, addr, peer_manager, logger).await
18+
pending_connections:
19+
Mutex<Vec<(PublicKey, Vec<tokio::sync::oneshot::Sender<Result<(), Error>>>)>>,
20+
peer_manager: Arc<PeerManager>,
21+
logger: L,
2522
}
2623

27-
pub(crate) async fn do_connect_peer<L: Deref + Clone + Sync + Send>(
28-
node_id: PublicKey, addr: SocketAddress, peer_manager: Arc<PeerManager>, logger: L,
29-
) -> Result<(), Error>
24+
impl<L: Deref + Clone + Sync + Send> ConnectionManager<L>
3025
where
3126
L::Target: Logger,
3227
{
33-
log_info!(logger, "Connecting to peer: {}@{}", node_id, addr);
34-
35-
let socket_addr = addr
36-
.to_socket_addrs()
37-
.map_err(|e| {
38-
log_error!(logger, "Failed to resolve network address: {}", e);
39-
Error::InvalidSocketAddress
40-
})?
41-
.next()
42-
.ok_or(Error::ConnectionFailed)?;
43-
44-
match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), node_id, socket_addr)
45-
.await
46-
{
47-
Some(connection_closed_future) => {
48-
let mut connection_closed_future = Box::pin(connection_closed_future);
49-
loop {
50-
tokio::select! {
51-
_ = &mut connection_closed_future => {
52-
log_info!(logger, "Peer connection closed: {}@{}", node_id, addr);
53-
return Err(Error::ConnectionFailed);
54-
},
55-
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
56-
};
57-
58-
match peer_manager.peer_by_node_id(&node_id) {
59-
Some(_) => return Ok(()),
60-
None => continue,
28+
pub(crate) fn new(peer_manager: Arc<PeerManager>, logger: L) -> Self {
29+
let pending_connections = Mutex::new(Vec::new());
30+
Self { pending_connections, peer_manager, logger }
31+
}
32+
33+
pub(crate) async fn connect_peer_if_necessary(
34+
&self, node_id: PublicKey, addr: SocketAddress,
35+
) -> Result<(), Error> {
36+
if self.peer_manager.peer_by_node_id(&node_id).is_some() {
37+
return Ok(());
38+
}
39+
40+
self.do_connect_peer(node_id, addr).await
41+
}
42+
43+
pub(crate) async fn do_connect_peer(
44+
&self, node_id: PublicKey, addr: SocketAddress,
45+
) -> Result<(), Error> {
46+
// First, we check if there is already an outbound connection in flight, if so, we just
47+
// await on the corresponding watch channel. The task driving the connection future will
48+
// send us the result..
49+
let pending_ready_receiver_opt = self.register_or_subscribe_pending_connection(&node_id);
50+
if let Some(pending_connection_ready_receiver) = pending_ready_receiver_opt {
51+
return pending_connection_ready_receiver.await.map_err(|e| {
52+
debug_assert!(false, "Failed to receive connection result: {:?}", e);
53+
log_error!(self.logger, "Failed to receive connection result: {:?}", e);
54+
Error::ConnectionFailed
55+
})?;
56+
}
57+
58+
log_info!(self.logger, "Connecting to peer: {}@{}", node_id, addr);
59+
60+
let socket_addr = addr
61+
.to_socket_addrs()
62+
.map_err(|e| {
63+
log_error!(self.logger, "Failed to resolve network address: {}", e);
64+
self.propagate_result_to_subscribers(&node_id, Err(Error::InvalidSocketAddress));
65+
Error::InvalidSocketAddress
66+
})?
67+
.next()
68+
.ok_or_else(|| {
69+
self.propagate_result_to_subscribers(&node_id, Err(Error::ConnectionFailed));
70+
Error::ConnectionFailed
71+
})?;
72+
73+
let connection_future = lightning_net_tokio::connect_outbound(
74+
Arc::clone(&self.peer_manager),
75+
node_id,
76+
socket_addr,
77+
);
78+
79+
let res = match connection_future.await {
80+
Some(connection_closed_future) => {
81+
let mut connection_closed_future = Box::pin(connection_closed_future);
82+
loop {
83+
tokio::select! {
84+
_ = &mut connection_closed_future => {
85+
log_info!(self.logger, "Peer connection closed: {}@{}", node_id, addr);
86+
break Err(Error::ConnectionFailed);
87+
},
88+
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
89+
};
90+
91+
match self.peer_manager.peer_by_node_id(&node_id) {
92+
Some(_) => break Ok(()),
93+
None => continue,
94+
}
6195
}
96+
},
97+
None => {
98+
log_error!(self.logger, "Failed to connect to peer: {}@{}", node_id, addr);
99+
Err(Error::ConnectionFailed)
100+
},
101+
};
102+
103+
self.propagate_result_to_subscribers(&node_id, res);
104+
105+
res
106+
}
107+
108+
fn register_or_subscribe_pending_connection(
109+
&self, node_id: &PublicKey,
110+
) -> Option<tokio::sync::oneshot::Receiver<Result<(), Error>>> {
111+
let mut pending_connections_lock = self.pending_connections.lock().unwrap();
112+
if let Some((_, connection_ready_senders)) =
113+
pending_connections_lock.iter_mut().find(|(id, _)| id == node_id)
114+
{
115+
let (tx, rx) = tokio::sync::oneshot::channel();
116+
connection_ready_senders.push(tx);
117+
Some(rx)
118+
} else {
119+
pending_connections_lock.push((*node_id, Vec::new()));
120+
None
121+
}
122+
}
123+
124+
fn propagate_result_to_subscribers(&self, node_id: &PublicKey, res: Result<(), Error>) {
125+
// Send the result to any other tasks that might be waiting on it by now.
126+
let mut pending_connections_lock = self.pending_connections.lock().unwrap();
127+
if let Some((_, connection_ready_senders)) = pending_connections_lock
128+
.iter()
129+
.position(|(id, _)| id == node_id)
130+
.map(|i| pending_connections_lock.remove(i))
131+
{
132+
for sender in connection_ready_senders {
133+
let _ = sender.send(res).map_err(|e| {
134+
debug_assert!(
135+
false,
136+
"Failed to send connection result to subscribers: {:?}",
137+
e
138+
);
139+
log_error!(
140+
self.logger,
141+
"Failed to send connection result to subscribers: {:?}",
142+
e
143+
);
144+
});
62145
}
63-
},
64-
None => {
65-
log_error!(logger, "Failed to connect to peer: {}@{}", node_id, addr);
66-
Err(Error::ConnectionFailed)
67-
},
146+
}
68147
}
69148
}

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::fmt;
22

3-
#[derive(Debug, PartialEq, Eq)]
3+
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
44
/// An error that possibly needs to be handled by the user.
55
pub enum Error {
66
/// Returned when trying to start [`crate::Node`] while it is already running.

src/lib.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ use config::{
125125
LDK_PAYMENT_RETRY_TIMEOUT, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL,
126126
RGS_SYNC_INTERVAL, WALLET_SYNC_INTERVAL_MINIMUM_SECS,
127127
};
128-
use connection::{connect_peer_if_necessary, do_connect_peer};
128+
use connection::ConnectionManager;
129129
use event::{EventHandler, EventQueue};
130130
use gossip::GossipSource;
131131
use liquidity::LiquiditySource;
@@ -189,6 +189,7 @@ pub struct Node {
189189
chain_monitor: Arc<ChainMonitor>,
190190
output_sweeper: Arc<Sweeper>,
191191
peer_manager: Arc<PeerManager>,
192+
connection_manager: Arc<ConnectionManager<Arc<FilesystemLogger>>>,
192193
keys_manager: Arc<KeysManager>,
193194
network_graph: Arc<NetworkGraph>,
194195
gossip_source: Arc<GossipSource>,
@@ -462,6 +463,7 @@ impl Node {
462463
}
463464

464465
// Regularly reconnect to persisted peers.
466+
let connect_cm = Arc::clone(&self.connection_manager);
465467
let connect_pm = Arc::clone(&self.peer_manager);
466468
let connect_logger = Arc::clone(&self.logger);
467469
let connect_peer_store = Arc::clone(&self.peer_store);
@@ -482,11 +484,9 @@ impl Node {
482484
.collect::<Vec<_>>();
483485

484486
for peer_info in connect_peer_store.list_peers().iter().filter(|info| !pm_peers.contains(&info.node_id)) {
485-
let res = do_connect_peer(
487+
let res = connect_cm.do_connect_peer(
486488
peer_info.node_id,
487489
peer_info.address.clone(),
488-
Arc::clone(&connect_pm),
489-
Arc::clone(&connect_logger),
490490
).await;
491491
match res {
492492
Ok(_) => {
@@ -803,14 +803,13 @@ impl Node {
803803

804804
let con_node_id = peer_info.node_id;
805805
let con_addr = peer_info.address.clone();
806-
let con_logger = Arc::clone(&self.logger);
807-
let con_pm = Arc::clone(&self.peer_manager);
806+
let con_cm = Arc::clone(&self.connection_manager);
808807

809808
// We need to use our main runtime here as a local runtime might not be around to poll
810809
// connection futures going forward.
811810
tokio::task::block_in_place(move || {
812811
runtime.block_on(async move {
813-
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
812+
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
814813
})
815814
})?;
816815

@@ -876,14 +875,13 @@ impl Node {
876875

877876
let con_node_id = peer_info.node_id;
878877
let con_addr = peer_info.address.clone();
879-
let con_logger = Arc::clone(&self.logger);
880-
let con_pm = Arc::clone(&self.peer_manager);
878+
let con_cm = Arc::clone(&self.connection_manager);
881879

882880
// We need to use our main runtime here as a local runtime might not be around to poll
883881
// connection futures going forward.
884882
tokio::task::block_in_place(move || {
885883
runtime.block_on(async move {
886-
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
884+
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
887885
})
888886
})?;
889887

@@ -1533,14 +1531,13 @@ impl Node {
15331531

15341532
let con_node_id = peer_info.node_id;
15351533
let con_addr = peer_info.address.clone();
1536-
let con_logger = Arc::clone(&self.logger);
1537-
let con_pm = Arc::clone(&self.peer_manager);
1534+
let con_cm = Arc::clone(&self.connection_manager);
15381535

15391536
// We need to use our main runtime here as a local runtime might not be around to poll
15401537
// connection futures going forward.
15411538
tokio::task::block_in_place(move || {
15421539
runtime.block_on(async move {
1543-
connect_peer_if_necessary(con_node_id, con_addr, con_pm, con_logger).await
1540+
con_cm.connect_peer_if_necessary(con_node_id, con_addr).await
15441541
})
15451542
})?;
15461543

0 commit comments

Comments
 (0)