diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 623aa969dfd..573e910ab08 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -337,6 +337,9 @@ struct Peer { pending_outbound_buffer: LinkedList>, pending_outbound_buffer_first_msg_offset: usize, + // Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize + // channel messages over them. + gossip_broadcast_buffer: LinkedList>, awaiting_write_event: bool, pending_read_buffer: Vec, @@ -389,21 +392,27 @@ impl Peer { self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE } - /// Determines if we should push additional gossip messages onto a peer's outbound buffer for - /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have - /// been drained. + /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's + /// outbound buffer. This is checked every time the peer's buffer may have been drained. fn should_buffer_gossip_backfill(&self) -> bool { - self.pending_outbound_buffer.is_empty() && - self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK + self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty() + && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK } - /// Returns whether this peer's buffer is full and we should drop gossip messages. - fn buffer_full_drop_gossip(&self) -> bool { - if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO { - return false - } - true + /// Determines if we should push additional gossip broadcast messages onto a peer's outbound + /// buffer. This is checked every time the peer's buffer may have been drained. + fn should_buffer_gossip_broadcast(&self) -> bool { + self.pending_outbound_buffer.is_empty() + && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK + } + + /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts. + fn buffer_full_drop_gossip_broadcast(&self) -> bool { + let total_outbound_buffered = + self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len(); + + total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || + self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO } } @@ -671,6 +680,7 @@ impl P pending_outbound_buffer: LinkedList::new(), pending_outbound_buffer_first_msg_offset: 0, + gossip_broadcast_buffer: LinkedList::new(), awaiting_write_event: false, pending_read_buffer, @@ -717,6 +727,7 @@ impl P pending_outbound_buffer: LinkedList::new(), pending_outbound_buffer_first_msg_offset: 0, + gossip_broadcast_buffer: LinkedList::new(), awaiting_write_event: false, pending_read_buffer, @@ -737,6 +748,11 @@ impl P fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { while !peer.awaiting_write_event { + if peer.should_buffer_gossip_broadcast() { + if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() { + peer.pending_outbound_buffer.push_back(msg); + } + } if peer.should_buffer_gossip_backfill() { match peer.sync_status { InitSyncTracker::NoSyncRequested => {}, @@ -851,12 +867,6 @@ impl P } } - /// Append a message to a peer's pending outbound/write buffer - fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec) { - peer.msgs_sent_since_pong += 1; - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); - } - /// Append a message to a peer's pending outbound/write buffer fn enqueue_message(&self, peer: &mut Peer, message: &M) { let mut buffer = VecWriter(Vec::with_capacity(2048)); @@ -867,7 +877,14 @@ impl P } else { log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap())) } - self.enqueue_encoded_message(peer, &buffer.0); + peer.msgs_sent_since_pong += 1; + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&buffer.0[..])); + } + + /// Append a message to a peer's pending outbound/write gossip broadcast buffer + fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec) { + peer.msgs_sent_since_pong += 1; + peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); } fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result { @@ -1325,7 +1342,7 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.buffer_full_drop_gossip() { + if peer.buffer_full_drop_gossip_broadcast() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1336,7 +1353,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(&mut *peer, &encoded_msg); + self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg); } }, wire::Message::NodeAnnouncement(ref msg) => { @@ -1349,7 +1366,7 @@ impl P !peer.should_forward_node_announcement(msg.contents.node_id) { continue } - if peer.buffer_full_drop_gossip() { + if peer.buffer_full_drop_gossip_broadcast() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1359,7 +1376,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(&mut *peer, &encoded_msg); + self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg); } }, wire::Message::ChannelUpdate(ref msg) => { @@ -1372,14 +1389,14 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.buffer_full_drop_gossip() { + if peer.buffer_full_drop_gossip_broadcast() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(&mut *peer, &encoded_msg); + self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg); } }, _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),