@@ -15,7 +15,7 @@ use util::byte_utils;
1515use util:: events:: { MessageSendEvent } ;
1616use util:: logger:: Logger ;
1717
18- use std:: collections:: { HashMap , hash_map, LinkedList } ;
18+ use std:: collections:: { HashMap , hash_map, HashSet , LinkedList } ;
1919use std:: sync:: { Arc , Mutex } ;
2020use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2121use std:: { cmp, error, hash, fmt} ;
@@ -106,17 +106,22 @@ struct Peer {
106106
107107struct PeerHolder < Descriptor : SocketDescriptor > {
108108 peers : HashMap < Descriptor , Peer > ,
109+ /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
110+ /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
111+ peers_needing_send : HashSet < Descriptor > ,
109112 /// Only add to this set when noise completes:
110113 node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
111114}
112115struct MutPeerHolder < ' a , Descriptor : SocketDescriptor + ' a > {
113116 peers : & ' a mut HashMap < Descriptor , Peer > ,
117+ peers_needing_send : & ' a mut HashSet < Descriptor > ,
114118 node_id_to_descriptor : & ' a mut HashMap < PublicKey , Descriptor > ,
115119}
116120impl < Descriptor : SocketDescriptor > PeerHolder < Descriptor > {
117121 fn borrow_parts ( & mut self ) -> MutPeerHolder < Descriptor > {
118122 MutPeerHolder {
119123 peers : & mut self . peers ,
124+ peers_needing_send : & mut self . peers_needing_send ,
120125 node_id_to_descriptor : & mut self . node_id_to_descriptor ,
121126 }
122127 }
@@ -162,7 +167,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
162167 pub fn new ( message_handler : MessageHandler , our_node_secret : SecretKey , logger : Arc < Logger > ) -> PeerManager < Descriptor > {
163168 PeerManager {
164169 message_handler : message_handler,
165- peers : Mutex :: new ( PeerHolder { peers : HashMap :: new ( ) , node_id_to_descriptor : HashMap :: new ( ) } ) ,
170+ peers : Mutex :: new ( PeerHolder {
171+ peers : HashMap :: new ( ) ,
172+ peers_needing_send : HashSet :: new ( ) ,
173+ node_id_to_descriptor : HashMap :: new ( )
174+ } ) ,
166175 our_node_secret : our_node_secret,
167176 initial_syncs_sent : AtomicUsize :: new ( 0 ) ,
168177 logger,
@@ -188,7 +197,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
188197 /// Note that if an Err is returned here you MUST NOT call disconnect_event for the new
189198 /// descriptor but must disconnect the connection immediately.
190199 ///
191- /// Returns some bytes to send to the remote node.
200+ /// Returns a small number of bytes to send to the remote node (currently always 50) .
192201 ///
193202 /// Panics if descriptor is duplicative with some other descriptor which has not yet has a
194203 /// disconnect_event.
@@ -298,16 +307,12 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
298307 ///
299308 /// May return an Err to indicate that the connection should be closed.
300309 ///
301- /// Will very likely call send_data on the descriptor passed in (or a descriptor handed into
302- /// new_*_connection) before returning. Thus, be very careful with reentrancy issues! The
303- /// invariants around calling write_event in case a write did not fully complete must still
304- /// hold. Note that this function will often call send_data on many peers before returning, not
305- /// just this peer!
310+ /// Will *not* call back into send_data on any descriptors to avoid reentrancy complexity.
311+ /// Thus, however, you almost certainly want to call process_events() after any read_event to
312+ /// generate send_data calls to handle responses.
306313 ///
307314 /// If Ok(true) is returned, further read_events should not be triggered until a write_event on
308- /// this file descriptor has resume_read set (preventing DoS issues in the send buffer). Note
309- /// that this must be true even if a send_data call with resume_read=true was made during the
310- /// course of this function!
315+ /// this file descriptor has resume_read set (preventing DoS issues in the send buffer).
311316 ///
312317 /// Panics if the descriptor was not previously registered in a new_*_connection event.
313318 pub fn read_event ( & self , peer_descriptor : & mut Descriptor , data : Vec < u8 > ) -> Result < bool , PeerHandleError > {
@@ -347,6 +352,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
347352 {
348353 log_trace!( self , "Encoding and sending message of type {} to {}" , $msg_code, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
349354 peer. pending_outbound_buffer. push_back( peer. channel_encryptor. encrypt_message( & encode_msg!( $msg, $msg_code) [ ..] ) ) ;
355+ peers. peers_needing_send. insert( peer_descriptor. clone( ) ) ;
350356 }
351357 }
352358 }
@@ -670,21 +676,21 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
670676 pause_read
671677 } ;
672678
673- self . process_events ( ) ;
674-
675679 Ok ( pause_read)
676680 }
677681
678- /// Checks for any events generated by our handlers and processes them. May be needed after eg
679- /// calls to ChannelManager::process_pending_htlc_forward.
682+ /// Checks for any events generated by our handlers and processes them. Includes sending most
683+ /// response messages as well as messages generated by calls to handler functions directly (eg
684+ /// functions like ChannelManager::process_pending_htlc_forward or send_payment).
680685 pub fn process_events ( & self ) {
681686 {
682687 // TODO: There are some DoS attacks here where you can flood someone's outbound send
683688 // buffer by doing things like announcing channels on another node. We should be willing to
684689 // drop optional-ish messages when send buffers get full!
685690
686691 let mut events_generated = self . message_handler . chan_handler . get_and_clear_pending_msg_events ( ) ;
687- let mut peers = self . peers . lock ( ) . unwrap ( ) ;
692+ let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
693+ let peers = peers_lock. borrow_parts ( ) ;
688694 for event in events_generated. drain ( ..) {
689695 macro_rules! get_peer_for_forwarding {
690696 ( $node_id: expr, $handle_no_such_peer: block) => {
@@ -888,6 +894,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
888894 match * action {
889895 msgs:: ErrorAction :: DisconnectPeer { ref msg } => {
890896 if let Some ( mut descriptor) = peers. node_id_to_descriptor . remove ( node_id) {
897+ peers. peers_needing_send . remove ( & descriptor) ;
891898 if let Some ( mut peer) = peers. peers . remove ( & descriptor) {
892899 if let Some ( ref msg) = * msg {
893900 log_trace ! ( self , "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}" ,
@@ -923,6 +930,13 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
923930 }
924931 }
925932 }
933+
934+ for mut descriptor in peers. peers_needing_send . drain ( ) {
935+ match peers. peers . get_mut ( & descriptor) {
936+ Some ( peer) => Self :: do_attempt_write_data ( & mut descriptor, peer) ,
937+ None => panic ! ( "Inconsistent peers set state!" ) ,
938+ }
939+ }
926940 }
927941 }
928942
@@ -938,6 +952,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
938952
939953 fn disconnect_event_internal ( & self , descriptor : & Descriptor , no_connection_possible : bool ) {
940954 let mut peers = self . peers . lock ( ) . unwrap ( ) ;
955+ peers. peers_needing_send . remove ( descriptor) ;
941956 let peer_option = peers. peers . remove ( descriptor) ;
942957 match peer_option {
943958 None => panic ! ( "Descriptor for disconnect_event is not already known to PeerManager" ) ,
0 commit comments