@@ -8,62 +8,141 @@ use bitcoin::secp256k1::PublicKey;
8
8
9
9
use std:: net:: ToSocketAddrs ;
10
10
use std:: ops:: Deref ;
11
- use std:: sync:: Arc ;
11
+ use std:: sync:: { Arc , Mutex } ;
12
12
use std:: time:: Duration ;
13
13
14
- pub ( crate ) async fn connect_peer_if_necessary < L : Deref + Clone + Sync + Send > (
15
- node_id : PublicKey , addr : SocketAddress , peer_manager : Arc < PeerManager > , logger : L ,
16
- ) -> Result < ( ) , Error >
14
+ pub ( crate ) struct ConnectionManager < L : Deref + Clone + Sync + Send >
17
15
where
18
16
L :: Target : Logger ,
19
17
{
20
- if peer_manager. peer_by_node_id ( & node_id) . is_some ( ) {
21
- return Ok ( ( ) ) ;
22
- }
23
-
24
- do_connect_peer ( node_id, addr, peer_manager, logger) . await
18
+ pending_connections :
19
+ Mutex < Vec < ( PublicKey , Vec < tokio:: sync:: oneshot:: Sender < Result < ( ) , Error > > > ) > > ,
20
+ peer_manager : Arc < PeerManager > ,
21
+ logger : L ,
25
22
}
26
23
27
- pub ( crate ) async fn do_connect_peer < L : Deref + Clone + Sync + Send > (
28
- node_id : PublicKey , addr : SocketAddress , peer_manager : Arc < PeerManager > , logger : L ,
29
- ) -> Result < ( ) , Error >
24
+ impl < L : Deref + Clone + Sync + Send > ConnectionManager < L >
30
25
where
31
26
L :: Target : Logger ,
32
27
{
33
- log_info ! ( logger, "Connecting to peer: {}@{}" , node_id, addr) ;
34
-
35
- let socket_addr = addr
36
- . to_socket_addrs ( )
37
- . map_err ( |e| {
38
- log_error ! ( logger, "Failed to resolve network address: {}" , e) ;
39
- Error :: InvalidSocketAddress
40
- } ) ?
41
- . next ( )
42
- . ok_or ( Error :: ConnectionFailed ) ?;
43
-
44
- match lightning_net_tokio:: connect_outbound ( Arc :: clone ( & peer_manager) , node_id, socket_addr)
45
- . await
46
- {
47
- Some ( connection_closed_future) => {
48
- let mut connection_closed_future = Box :: pin ( connection_closed_future) ;
49
- loop {
50
- tokio:: select! {
51
- _ = & mut connection_closed_future => {
52
- log_info!( logger, "Peer connection closed: {}@{}" , node_id, addr) ;
53
- return Err ( Error :: ConnectionFailed ) ;
54
- } ,
55
- _ = tokio:: time:: sleep( Duration :: from_millis( 10 ) ) => { } ,
56
- } ;
57
-
58
- match peer_manager. peer_by_node_id ( & node_id) {
59
- Some ( _) => return Ok ( ( ) ) ,
60
- None => continue ,
28
+ pub ( crate ) fn new ( peer_manager : Arc < PeerManager > , logger : L ) -> Self {
29
+ let pending_connections = Mutex :: new ( Vec :: new ( ) ) ;
30
+ Self { pending_connections, peer_manager, logger }
31
+ }
32
+
33
+ pub ( crate ) async fn connect_peer_if_necessary (
34
+ & self , node_id : PublicKey , addr : SocketAddress ,
35
+ ) -> Result < ( ) , Error > {
36
+ if self . peer_manager . peer_by_node_id ( & node_id) . is_some ( ) {
37
+ return Ok ( ( ) ) ;
38
+ }
39
+
40
+ self . do_connect_peer ( node_id, addr) . await
41
+ }
42
+
43
+ pub ( crate ) async fn do_connect_peer (
44
+ & self , node_id : PublicKey , addr : SocketAddress ,
45
+ ) -> Result < ( ) , Error > {
46
+ // First, we check if there is already an outbound connection in flight, if so, we just
47
+ // await on the corresponding watch channel. The task driving the connection future will
48
+ // send us the result..
49
+ let pending_ready_receiver_opt = self . register_or_subscribe_pending_connection ( & node_id) ;
50
+ if let Some ( pending_connection_ready_receiver) = pending_ready_receiver_opt {
51
+ return pending_connection_ready_receiver. await . map_err ( |e| {
52
+ debug_assert ! ( false , "Failed to receive connection result: {:?}" , e) ;
53
+ log_error ! ( self . logger, "Failed to receive connection result: {:?}" , e) ;
54
+ Error :: ConnectionFailed
55
+ } ) ?;
56
+ }
57
+
58
+ log_info ! ( self . logger, "Connecting to peer: {}@{}" , node_id, addr) ;
59
+
60
+ let socket_addr = addr
61
+ . to_socket_addrs ( )
62
+ . map_err ( |e| {
63
+ log_error ! ( self . logger, "Failed to resolve network address: {}" , e) ;
64
+ self . propagate_result_to_subscribers ( & node_id, Err ( Error :: InvalidSocketAddress ) ) ;
65
+ Error :: InvalidSocketAddress
66
+ } ) ?
67
+ . next ( )
68
+ . ok_or_else ( || {
69
+ self . propagate_result_to_subscribers ( & node_id, Err ( Error :: ConnectionFailed ) ) ;
70
+ Error :: ConnectionFailed
71
+ } ) ?;
72
+
73
+ let connection_future = lightning_net_tokio:: connect_outbound (
74
+ Arc :: clone ( & self . peer_manager ) ,
75
+ node_id,
76
+ socket_addr,
77
+ ) ;
78
+
79
+ let res = match connection_future. await {
80
+ Some ( connection_closed_future) => {
81
+ let mut connection_closed_future = Box :: pin ( connection_closed_future) ;
82
+ loop {
83
+ tokio:: select! {
84
+ _ = & mut connection_closed_future => {
85
+ log_info!( self . logger, "Peer connection closed: {}@{}" , node_id, addr) ;
86
+ break Err ( Error :: ConnectionFailed ) ;
87
+ } ,
88
+ _ = tokio:: time:: sleep( Duration :: from_millis( 10 ) ) => { } ,
89
+ } ;
90
+
91
+ match self . peer_manager . peer_by_node_id ( & node_id) {
92
+ Some ( _) => break Ok ( ( ) ) ,
93
+ None => continue ,
94
+ }
61
95
}
96
+ } ,
97
+ None => {
98
+ log_error ! ( self . logger, "Failed to connect to peer: {}@{}" , node_id, addr) ;
99
+ Err ( Error :: ConnectionFailed )
100
+ } ,
101
+ } ;
102
+
103
+ self . propagate_result_to_subscribers ( & node_id, res) ;
104
+
105
+ res
106
+ }
107
+
108
+ fn register_or_subscribe_pending_connection (
109
+ & self , node_id : & PublicKey ,
110
+ ) -> Option < tokio:: sync:: oneshot:: Receiver < Result < ( ) , Error > > > {
111
+ let mut pending_connections_lock = self . pending_connections . lock ( ) . unwrap ( ) ;
112
+ if let Some ( ( _, connection_ready_senders) ) =
113
+ pending_connections_lock. iter_mut ( ) . find ( |( id, _) | id == node_id)
114
+ {
115
+ let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
116
+ connection_ready_senders. push ( tx) ;
117
+ Some ( rx)
118
+ } else {
119
+ pending_connections_lock. push ( ( * node_id, Vec :: new ( ) ) ) ;
120
+ None
121
+ }
122
+ }
123
+
124
+ fn propagate_result_to_subscribers ( & self , node_id : & PublicKey , res : Result < ( ) , Error > ) {
125
+ // Send the result to any other tasks that might be waiting on it by now.
126
+ let mut pending_connections_lock = self . pending_connections . lock ( ) . unwrap ( ) ;
127
+ if let Some ( ( _, connection_ready_senders) ) = pending_connections_lock
128
+ . iter ( )
129
+ . position ( |( id, _) | id == node_id)
130
+ . map ( |i| pending_connections_lock. remove ( i) )
131
+ {
132
+ for sender in connection_ready_senders {
133
+ let _ = sender. send ( res) . map_err ( |e| {
134
+ debug_assert ! (
135
+ false ,
136
+ "Failed to send connection result to subscribers: {:?}" ,
137
+ e
138
+ ) ;
139
+ log_error ! (
140
+ self . logger,
141
+ "Failed to send connection result to subscribers: {:?}" ,
142
+ e
143
+ ) ;
144
+ } ) ;
62
145
}
63
- } ,
64
- None => {
65
- log_error ! ( logger, "Failed to connect to peer: {}@{}" , node_id, addr) ;
66
- Err ( Error :: ConnectionFailed )
67
- } ,
146
+ }
68
147
}
69
148
}
0 commit comments