@@ -20,7 +20,7 @@ use util::events::{MessageSendEvent, MessageSendEventsProvider};
2020use util:: logger:: Logger ;
2121use routing:: network_graph:: NetGraphMsgHandler ;
2222
23- use std:: collections:: { HashMap , HashSet } ;
23+ use std:: collections:: HashMap ;
2424use std:: sync:: { Arc , Mutex } ;
2525use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
2626use std:: { cmp, error, hash, fmt} ;
@@ -237,9 +237,6 @@ impl<TransportImpl: ITransport> Peer<TransportImpl> {
237237
238238struct PeerHolder < Descriptor : SocketDescriptor , TransportImpl : ITransport > {
239239 peers : HashMap < Descriptor , Peer < TransportImpl > > ,
240- /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
241- /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
242- peers_needing_send : HashSet < Descriptor > ,
243240 /// Peers in this map have completed the NOISE handshake and received an Init message
244241 node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
245242}
@@ -281,9 +278,6 @@ impl<Descriptor: SocketDescriptor, TransportImpl: ITransport> PeerHolder<Descrip
281278
282279 // Removes all associated metadata for descriptor and returns the Peer object associated with it
283280 fn remove_peer_by_descriptor ( & mut self , descriptor : & Descriptor ) -> Peer < TransportImpl > {
284- // may or may not be in this set depending on in-flight messages
285- self . peers_needing_send . remove ( descriptor) ;
286-
287281 let peer_option = self . peers . remove ( descriptor) ;
288282 match peer_option {
289283 None => panic ! ( "Descriptor for disconnect_event is not already known to PeerManager" ) ,
@@ -300,6 +294,23 @@ impl<Descriptor: SocketDescriptor, TransportImpl: ITransport> PeerHolder<Descrip
300294 }
301295 }
302296 }
297+
298+ // Returns the collection of peers that have data to send. Could be due to items in their outbound
299+ // queue or sync messages that need to be sent out.
300+ fn peers_needing_send < ' a > ( & ' a mut self ) -> Filter < IterMut < ' a , Descriptor , Peer < TransportImpl > > , fn ( & ( & ' a Descriptor , & ' a mut Peer < TransportImpl > ) ) -> bool > {
301+ self . peers . iter_mut ( ) . filter ( |( _, peer) | {
302+ let has_outbound_sync = match & peer. post_init_state {
303+ None => false ,
304+ Some ( post_init_state) => match & post_init_state. sync_status {
305+ InitSyncTracker :: NoSyncRequested => false ,
306+ InitSyncTracker :: ChannelsSyncing ( _) => true ,
307+ InitSyncTracker :: NodesSyncing ( _) => true ,
308+ }
309+ } ;
310+
311+ has_outbound_sync || !peer. outbound_queue . is_empty ( )
312+ } )
313+ }
303314}
304315
305316#[ cfg( not( any( target_pointer_width = "32" , target_pointer_width = "64" ) ) ) ]
@@ -499,7 +510,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
499510 message_handler,
500511 peers : Mutex :: new ( PeerHolder {
501512 peers : HashMap :: new ( ) ,
502- peers_needing_send : HashSet :: new ( ) ,
503513 node_id_to_descriptor : HashMap :: new ( )
504514 } ) ,
505515 our_node_secret,
@@ -654,7 +664,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
654664 None => panic ! ( "Descriptor for read_event is not already known to PeerManager" ) ,
655665 Some ( peer) => peer
656666 } ;
657- self . do_read_event ( peer_descriptor, peer, & mut peers. peers_needing_send , & mut peers . node_id_to_descriptor , data)
667+ self . do_read_event ( peer_descriptor, peer, & mut peers. node_id_to_descriptor , data)
658668 } ;
659669
660670 match result {
@@ -666,12 +676,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
666676 }
667677 }
668678
669- /// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
670- fn enqueue_message < M : Encode + Writeable > ( & self , peers_needing_send : & mut HashSet < Descriptor > , message_queuer : & mut impl MessageQueuer , output_buffer : & mut impl PayloadQueuer , descriptor : & Descriptor , message : & M ) {
671- message_queuer. enqueue_message ( message, output_buffer, & * self . logger ) ;
672- peers_needing_send. insert ( descriptor. clone ( ) ) ;
673- }
674-
675679 // Returns a valid PostInitState given a Init message
676680 fn post_init_state_from_init_message ( & self , init_message : & msgs:: Init , their_node_id : & PublicKey ) -> Result < PostInitState , PeerHandleError > {
677681 if init_message. features . requires_unknown_bits ( ) {
@@ -707,18 +711,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
707711 }
708712
709713 // Add an Init message to the outbound queue
710- fn enqueue_init_message ( & self , descriptor : & Descriptor , peer : & mut Peer < TransportImpl > , peers_needing_send : & mut HashSet < Descriptor > ) {
714+ fn enqueue_init_message ( & self , peer : & mut Peer < TransportImpl > ) {
711715 let mut features = InitFeatures :: known ( ) ;
712716 if !self . message_handler . route_handler . should_request_full_sync ( & peer. transport . get_their_node_id ( ) ) {
713717 features. clear_initial_routing_sync ( ) ;
714718 }
715719
716720 let resp = msgs:: Init { features } ;
717- self . enqueue_message ( peers_needing_send , & mut peer . transport , & mut peer. outbound_queue , descriptor , & resp ) ;
721+ peer . transport . enqueue_message ( & resp , & mut peer. outbound_queue , & * self . logger ) ;
718722 }
719723
720724 // Process an incoming Init message and set Peer and PeerManager state accordingly
721- fn process_init_message ( & self , message : Message , descriptor : & Descriptor , peer : & mut Peer < TransportImpl > , peers_needing_send : & mut HashSet < Descriptor > , node_id_to_descriptor : & mut HashMap < PublicKey , Descriptor > ) -> Result < ( ) , PeerHandleError > {
725+ fn process_init_message ( & self , message : Message , descriptor : & Descriptor , peer : & mut Peer < TransportImpl > , node_id_to_descriptor : & mut HashMap < PublicKey , Descriptor > ) -> Result < ( ) , PeerHandleError > {
722726 let their_node_id = peer. transport . get_their_node_id ( ) ;
723727
724728 match message {
@@ -731,13 +735,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
731735
732736 let new_post_init_state = self . post_init_state_from_init_message ( init_message, & their_node_id) ?;
733737
734- if let InitSyncTracker :: ChannelsSyncing ( _) = new_post_init_state. sync_status {
735- peers_needing_send. insert ( descriptor. clone ( ) ) ;
736- }
737-
738738 if !peer. outbound {
739- self . enqueue_init_message ( descriptor , peer, peers_needing_send ) ;
739+ self . enqueue_init_message ( peer) ;
740740 }
741+
741742 node_id_to_descriptor. insert ( their_node_id. clone ( ) , descriptor. clone ( ) ) ;
742743 self . message_handler . chan_handler . peer_connected ( & their_node_id, init_message) ;
743744
@@ -753,7 +754,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
753754 Ok ( ( ) )
754755 }
755756
756- fn do_read_event ( & self , peer_descriptor : & mut Descriptor , peer : & mut Peer < TransportImpl > , peers_needing_send : & mut HashSet < Descriptor > , node_id_to_descriptor : & mut HashMap < PublicKey , Descriptor > , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
757+ fn do_read_event ( & self , peer_descriptor : & mut Descriptor , peer : & mut Peer < TransportImpl > , node_id_to_descriptor : & mut HashMap < PublicKey , Descriptor > , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
757758
758759 match peer. transport . process_input ( data, & mut peer. outbound_queue ) {
759760 Err ( e) => {
@@ -766,13 +767,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
766767 }
767768
768769 if newly_connected && peer. outbound {
769- self . enqueue_init_message ( peer_descriptor, peer, peers_needing_send) ;
770- }
771-
772- // If the transport layer placed items in the outbound queue, we need
773- // to schedule ourselves for flush during the next process_events()
774- if !peer. outbound_queue . is_empty ( ) {
775- peers_needing_send. insert ( peer_descriptor. clone ( ) ) ;
770+ self . enqueue_init_message ( peer) ;
776771 }
777772 }
778773 }
@@ -781,7 +776,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
781776
782777 if peer. transport . is_connected ( ) && peer. post_init_state . is_none ( ) && received_messages. len ( ) > 0 {
783778 let init_message = received_messages. remove ( 0 ) ;
784- self . process_init_message ( init_message, peer_descriptor, peer, peers_needing_send , node_id_to_descriptor) ?;
779+ self . process_init_message ( init_message, peer_descriptor, peer, node_id_to_descriptor) ?;
785780 }
786781
787782 for message in received_messages {
@@ -802,7 +797,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
802797 } ,
803798 msgs:: ErrorAction :: SendErrorMessage { msg } => {
804799 log_trace!( self . logger, "Got Err handling message, sending Error message because {}" , e. err) ;
805- self . enqueue_message( peers_needing_send , & mut peer . transport , & mut peer. outbound_queue, peer_descriptor , & msg ) ;
800+ peer . transport . enqueue_message( & msg , & mut peer. outbound_queue, & * self . logger ) ;
806801 continue ;
807802 } ,
808803 }
@@ -811,7 +806,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
811806 }
812807 }
813808
814- if let Err ( handling_error) = self . handle_message ( message, peer_descriptor , peer, peers_needing_send ) {
809+ if let Err ( handling_error) = self . handle_message ( message, peer) {
815810 match handling_error {
816811 MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
817812 MessageHandlingError :: LightningError ( e) => {
@@ -827,9 +822,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
827822 /// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
828823 fn handle_message ( & self ,
829824 message : wire:: Message ,
830- peer_descriptor : & mut Descriptor ,
831- peer : & mut Peer < TransportImpl > ,
832- peers_needing_send : & mut HashSet < Descriptor > ) -> Result < ( ) , MessageHandlingError > {
825+ peer : & mut Peer < TransportImpl > ) -> Result < ( ) , MessageHandlingError > {
833826
834827 let their_node_id = peer. transport . get_their_node_id ( ) ;
835828 let post_init_state = peer. post_init_state . as_mut ( ) . unwrap ( ) ;
@@ -864,7 +857,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
864857 wire:: Message :: Ping ( msg) => {
865858 if msg. ponglen < 65532 {
866859 let resp = msgs:: Pong { byteslen : msg. ponglen } ;
867- self . enqueue_message ( peers_needing_send , & mut peer . transport , & mut peer. outbound_queue , & peer_descriptor , & resp ) ;
860+ peer . transport . enqueue_message ( & resp , & mut peer. outbound_queue , & * self . logger ) ;
868861 }
869862 } ,
870863 wire:: Message :: Pong ( _msg) => {
@@ -1242,11 +1235,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
12421235 }
12431236 }
12441237
1245- for mut descriptor in peers. peers_needing_send . drain ( ) {
1246- match peers. peers . get_mut ( & descriptor) {
1247- Some ( peer) => self . do_attempt_write_data ( & mut descriptor, & mut peer. post_init_state , & mut peer. transport , & mut peer. outbound_queue ) ,
1248- None => panic ! ( "Inconsistent peers set state!" ) ,
1249- }
1238+ for ( descriptor, peer) in peers. peers_needing_send ( ) {
1239+ self . do_attempt_write_data ( & mut descriptor. clone ( ) , & mut peer. post_init_state , & mut peer. transport , & mut peer. outbound_queue ) ;
12501240 }
12511241 }
12521242 }
0 commit comments