@@ -544,7 +544,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
544544
545545 log_trace ! ( self . logger, "Enqueueing message of type {} to {}" , message. type_id( ) , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
546546 match peer. encryptor {
547- PeerState :: Connected ( ref mut conduit) => peer. pending_outbound_buffer . push_back ( conduit. encrypt ( & encode_msg ! ( $msg ) [ ..] ) ) ,
547+ PeerState :: Connected ( ref mut conduit) => peer. pending_outbound_buffer . push_back ( conduit. encrypt ( & encoded_message [ ..] ) ) ,
548548 _ => panic ! ( "peer must be connected!" )
549549 }
550550 peers_needing_send. insert ( descriptor) ;
@@ -563,7 +563,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
563563 let data_processing_decision = peer. encryptor . process_peer_data ( data, & mut peer. pending_outbound_buffer ) ;
564564 match data_processing_decision {
565565 PeerDataProcessingDecision :: Disconnect ( e) => {
566- log_trace ! ( self , "Invalid act message; disconnecting: {}" , e) ;
566+ log_trace ! ( self . logger , "Invalid act message; disconnecting: {}" , e) ;
567567 return Err ( e) ;
568568 }
569569
@@ -577,61 +577,35 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
577577 // insert node id
578578 match peers. node_id_to_descriptor . entry ( peer. their_node_id . unwrap ( ) ) {
579579 hash_map:: Entry :: Occupied ( _) => {
580- log_trace ! ( self , "Got second connection with {}, closing" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
580+ log_trace ! ( self . logger , "Got second connection with {}, closing" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
581581 peer. their_node_id = None ; // Unset so that we don't generate a peer_disconnected event
582582 return Err ( PeerHandleError { no_connection_possible : false } ) ;
583583 }
584584 hash_map:: Entry :: Vacant ( entry) => {
585- log_trace ! ( self , "Finished noise handshake for connection with {}" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
585+ log_trace ! ( self . logger , "Finished noise handshake for connection with {}" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
586586 entry. insert ( peer_descriptor. clone ( ) )
587587 }
588588 } ;
589589 }
590590 _ => { }
591591 } ;
592592
593- if let & mut PeerState :: Connected ( ref mut conduit) = & mut peer. encryptor {
593+ if send_init_message {
594+ let mut features = InitFeatures :: known ( ) ;
595+ if !self . message_handler . route_handler . should_request_full_sync ( & peer. their_node_id . unwrap ( ) ) {
596+ features. clear_initial_routing_sync ( ) ;
597+ }
598+
599+ let resp = msgs:: Init { features } ;
600+ self . enqueue_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , & resp) ;
601+ send_init_message = false
602+ }
594603
604+ let mut received_messages = vec ! [ ] ;
605+ if let & mut PeerState :: Connected ( ref mut conduit) = & mut peer. encryptor {
595606 let encryptor = & mut conduit. encryptor ;
596607 let decryptor = & mut conduit. decryptor ;
597608
598- macro_rules! try_potential_handleerror {
599- ( $thing: expr) => {
600- match $thing {
601- Ok ( x) => x,
602- Err ( e) => {
603- match e. action {
604- msgs:: ErrorAction :: DisconnectPeer { msg: _ } => {
605- //TODO: Try to push msg
606- log_trace!( self . logger, "Got Err handling message, disconnecting peer because {}" , e. err) ;
607- return Err ( PeerHandleError { no_connection_possible: false } ) ;
608- } ,
609- msgs:: ErrorAction :: IgnoreError => {
610- log_trace!( self . logger, "Got Err handling message, ignoring because {}" , e. err) ;
611- continue ;
612- } ,
613- msgs:: ErrorAction :: SendErrorMessage { msg } => {
614- log_trace!( self . logger, "Got Err handling message, sending Error message because {}" , e. err) ;
615- self . enqueue_message( & mut peers. peers_needing_send, peer, peer_descriptor. clone( ) , & msg) ;
616- continue ;
617- } ,
618- }
619- }
620- } ;
621- }
622- }
623-
624- if send_init_message {
625- let mut features = InitFeatures :: known ( ) ;
626- if !self . message_handler . route_handler . should_request_full_sync ( & peer. their_node_id . unwrap ( ) ) {
627- features. clear_initial_routing_sync ( ) ;
628- }
629-
630- let resp = msgs:: Init { features } ;
631- self . enqueue_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , & resp) ;
632- send_init_message = false
633- }
634-
635609 for msg_data in decryptor {
636610 let mut reader = :: std:: io:: Cursor :: new ( & msg_data[ ..] ) ;
637611 let message_result = wire:: read ( & mut reader) ;
@@ -658,16 +632,43 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
658632 }
659633 } ;
660634
661- if let Err ( handling_error) = self . handle_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , message) {
662- match handling_error {
663- MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
664- MessageHandlingError :: LightningError ( e) => {
665- try_potential_handleerror ! ( Err ( e) ) ;
666- } ,
667- }
635+ received_messages. push ( message) ;
636+ }
637+ }
638+
639+ for message in received_messages {
640+ macro_rules! try_potential_handleerror {
641+ ( $thing: expr) => {
642+ match $thing {
643+ Ok ( x) => x,
644+ Err ( e) => {
645+ match e. action {
646+ msgs:: ErrorAction :: DisconnectPeer { msg: _ } => {
647+ //TODO: Try to push msg
648+ log_trace!( self . logger, "Got Err handling message, disconnecting peer because {}" , e. err) ;
649+ return Err ( PeerHandleError { no_connection_possible: false } ) ;
650+ } ,
651+ msgs:: ErrorAction :: IgnoreError => {
652+ log_trace!( self . logger, "Got Err handling message, ignoring because {}" , e. err) ;
653+ continue ;
654+ } ,
655+ msgs:: ErrorAction :: SendErrorMessage { msg } => {
656+ log_trace!( self . logger, "Got Err handling message, sending Error message because {}" , e. err) ;
657+ self . enqueue_message( & mut peers. peers_needing_send, peer, peer_descriptor. clone( ) , & msg) ;
658+ continue ;
659+ } ,
668660 }
669661 }
670- }
662+ } ;
663+ }
664+ }
665+
666+ if let Err ( handling_error) = self . handle_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , message) {
667+ match handling_error {
668+ MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
669+ MessageHandlingError :: LightningError ( e) => {
670+ try_potential_handleerror ! ( Err ( e) ) ;
671+ } ,
671672 }
672673 }
673674 }
@@ -1267,7 +1268,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
12671268
12681269#[ cfg( test) ]
12691270mod tests {
1270- use ln:: peer_handler :: { PeerManager , MessageHandler , SocketDescriptor } ;
1271+ use ln:: peers :: handler :: { PeerManager , MessageHandler , SocketDescriptor } ;
12711272 use ln:: msgs;
12721273 use util:: events;
12731274 use util:: test_utils;
0 commit comments