@@ -39,15 +39,15 @@ use bitcoin::hashes::sha256::Hash as Sha256;
3939use bitcoin:: hashes:: sha256:: HashEngine as Sha256Engine ;
4040use bitcoin:: hashes:: { HashEngine , Hash } ;
4141use ln:: peers:: outbound_queue:: OutboundQueue ;
42- use ln:: peers:: transport:: { PayloadQueuer , Transport } ;
42+ use ln:: peers:: transport:: Transport ;
4343use std:: collections:: hash_map:: IterMut ;
4444use std:: iter:: Filter ;
4545
4646// Number of items that can exist in the OutboundQueue before Sync message flow control is triggered
4747const OUTBOUND_QUEUE_SIZE : usize = 10 ;
4848
4949/// Interface PeerManager uses to interact with the Transport object
50- pub ( super ) trait ITransport : MessageQueuer {
50+ pub ( super ) trait ITransport {
5151 /// Instantiate the new outbound Transport
5252 fn new_outbound ( initiator_static_private_key : & SecretKey , responder_static_public_key : & PublicKey , initiator_ephemeral_private_key : & SecretKey ) -> Self ;
5353
@@ -59,29 +59,51 @@ pub(super) trait ITransport: MessageQueuer {
5959
6060 /// Process input data similar to reading it off a descriptor directly. Returns true on the first call
6161 /// that results in the transport being newly connected.
62- fn process_input ( & mut self , input : & [ u8 ] , output_buffer : & mut impl PayloadQueuer ) -> Result < bool , String > ;
62+ fn process_input ( & mut self , input : & [ u8 ] , outbound_queue : & mut impl IOutboundQueue ) -> Result < bool , String > ;
6363
6464 /// Returns true if the connection is established and encrypted messages can be sent.
6565 fn is_connected ( & self ) -> bool ;
6666
6767 /// Returns the node_id of the remote node. Panics if not connected.
6868 fn get_their_node_id ( & self ) -> PublicKey ;
6969
70+ /// Encodes, encrypts, and enqueues a message to the outbound queue. Panics if the connection is
71+ /// not established yet.
72+ fn enqueue_message < M : Encode + Writeable , L : Deref > ( & mut self , message : & M , outbound_queue : & mut impl IOutboundQueue , logger : L ) where L :: Target : Logger ;
73+
7074 /// Returns all Messages that have been received and can be successfully parsed by the Transport
7175 fn drain_messages < L : Deref > ( & mut self , logger : L ) -> Result < Vec < Message > , PeerHandleError > where L :: Target : Logger ;
7276}
7377
74- /// Interface PeerManager uses to queue message to send. Implemented by Transport to handle
75- /// encryption/decryption post-NOISE.
76- pub ( super ) trait MessageQueuer {
77- /// Encodes, encrypts, and enqueues a message to the outbound queue. Panics if the connection is
78- /// not established yet.
79- fn enqueue_message < M : Encode + Writeable , Q : PayloadQueuer , L : Deref > ( & mut self , message : & M , output_buffer : & mut Q , logger : L ) where L :: Target : Logger ;
80- }
78+ /// The OutboundQueue is a container for unencrypted payloads during the NOISE handshake and
79+ /// encrypted Messages post-NOISE. This trait abstracts the behavior to push items to a queue, flush
80+ /// them through a SocketDescriptor, and handle flow control. Each Peer owns a separate OutboundQueue.
81+ ///
82+ /// A trait is used to enable tests to use test doubles that implement a subset of the api with
83+ /// cleaner test validation.
84+ pub ( super ) trait IOutboundQueue {
85+
86+ // ____ _ __ __ _ _ _
87+ // | _ \ _ _ ___| |__ | \/ | ___| |_| |__ ___ __| |___
88+ // | |_) | | | / __| '_ \ | |\/| |/ _ \ __| '_ \ / _ \ / _` / __|
89+ // | __/| |_| \__ \ | | | | | | | __/ |_| | | | (_) | (_| \__ \
90+ // |_| \__,_|___/_| |_| |_| |_|\___|\__|_| |_|\___/ \__,_|___/
91+
92+ /// Unconditionally queue item. May increase queue above soft limit.
93+ fn push_back ( & mut self , item : Vec < u8 > ) ;
94+
95+ /// Returns true if the queue is empty
96+ fn is_empty ( & self ) -> bool ;
97+
98+ /// Returns the amount of free space in the queue before the soft limit
99+ fn queue_space ( & self ) -> usize ;
100+
101+ // _____ _ _ __ __ _ _ _
102+ // | ___| |_ _ ___| |__ | \/ | ___| |_| |__ ___ __| |___
103+ // | |_ | | | | / __| '_ \ | |\/| |/ _ \ __| '_ \ / _ \ / _` / __|
104+ // | _| | | |_| \__ \ | | | | | | | __/ |_| | | | (_) | (_| \__ \
105+ // |_| |_|\__,_|___/_| |_| |_| |_|\___|\__|_| |_|\___/ \__,_|___/
81106
82- /// Trait representing a container that can try to flush data through a SocketDescriptor. Used by the
83- /// PeerManager to handle flushing the outbound queue and flow control.
84- pub ( super ) trait SocketDescriptorFlusher {
85107 /// Write previously enqueued data to the SocketDescriptor. A return of false indicates the
86108 /// underlying SocketDescriptor could not fulfill the send_data() call and the blocked state
87109 /// has been set. Use unblock() when the SocketDescriptor may have more room.
@@ -583,11 +605,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
583605
584606 // Fill remaining slots in output queue with sync messages, updating the sync state when
585607 // appropriate
586- fn fill_outbound_queue_with_sync < Q : PayloadQueuer + SocketDescriptorFlusher > (
608+ fn fill_outbound_queue_with_sync (
587609 & self ,
588610 sync_status : & mut InitSyncTracker ,
589- message_queuer : & mut impl MessageQueuer ,
590- outbound_queue : & mut Q ) {
611+ transport : & mut TransportImpl ,
612+ outbound_queue : & mut OutboundQueue ) {
591613
592614 let queue_space = outbound_queue. queue_space ( ) ;
593615 if queue_space > 0 {
@@ -597,12 +619,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
597619 let steps = ( ( queue_space + 2 ) / 3 ) as u8 ;
598620 let all_messages = self . message_handler . route_handler . get_next_channel_announcements ( c, steps) ;
599621 for & ( ref announce, ref update_a_option, ref update_b_option) in all_messages. iter ( ) {
600- message_queuer . enqueue_message ( announce, outbound_queue, & * self . logger ) ;
622+ transport . enqueue_message ( announce, outbound_queue, & * self . logger ) ;
601623 if let & Some ( ref update_a) = update_a_option {
602- message_queuer . enqueue_message ( update_a, outbound_queue, & * self . logger ) ;
624+ transport . enqueue_message ( update_a, outbound_queue, & * self . logger ) ;
603625 }
604626 if let & Some ( ref update_b) = update_b_option {
605- message_queuer . enqueue_message ( update_b, outbound_queue, & * self . logger ) ;
627+ transport . enqueue_message ( update_b, outbound_queue, & * self . logger ) ;
606628 }
607629 * sync_status = InitSyncTracker :: ChannelsSyncing ( announce. contents . short_channel_id + 1 ) ;
608630 }
@@ -614,7 +636,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
614636 let steps = queue_space as u8 ;
615637 let all_messages = self . message_handler . route_handler . get_next_node_announcements ( None , steps) ;
616638 for msg in all_messages. iter ( ) {
617- message_queuer . enqueue_message ( msg, outbound_queue, & * self . logger ) ;
639+ transport . enqueue_message ( msg, outbound_queue, & * self . logger ) ;
618640 * sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
619641 }
620642 if all_messages. is_empty ( ) || all_messages. len ( ) != steps as usize {
@@ -626,7 +648,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
626648 let steps = queue_space as u8 ;
627649 let all_messages = self . message_handler . route_handler . get_next_node_announcements ( Some ( & key) , steps) ;
628650 for msg in all_messages. iter ( ) {
629- message_queuer . enqueue_message ( msg, outbound_queue, & * self . logger ) ;
651+ transport . enqueue_message ( msg, outbound_queue, & * self . logger ) ;
630652 * sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
631653 }
632654 if all_messages. is_empty ( ) || all_messages. len ( ) != steps as usize {
@@ -637,18 +659,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
637659 }
638660 }
639661
640- fn do_attempt_write_data < Q : PayloadQueuer + SocketDescriptorFlusher > (
662+ fn do_attempt_write_data (
641663 & self ,
642664 descriptor : & mut Descriptor ,
643665 post_init_state : & mut Option < PostInitState > ,
644- message_queuer : & mut impl MessageQueuer ,
645- outbound_queue : & mut Q ) {
666+ transport : & mut TransportImpl ,
667+ outbound_queue : & mut OutboundQueue ) {
646668
647669 while !outbound_queue. is_blocked ( ) {
648670 // If connected, fill output queue with sync messages
649671 match post_init_state {
650672 None => { } ,
651- & mut Some ( ref mut state) => self . fill_outbound_queue_with_sync ( & mut state. sync_status , message_queuer , outbound_queue)
673+ & mut Some ( ref mut state) => self . fill_outbound_queue_with_sync ( & mut state. sync_status , transport , outbound_queue)
652674 }
653675
654676 // No messages to send
0 commit comments