From 1de698fdd9274dc47e18fb680ea15166742785a1 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 25 Aug 2022 13:56:19 -0400 Subject: [PATCH 1/3] PeerMan: fix bug in drop_gossip util Fixes a flipped bool that was introduced in 4a1ee5f9a984c9b0c0892025d624ade734337b1a --- lightning/src/ln/peer_handler.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 623aa969dfd..925d7e5946d 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -399,11 +399,8 @@ impl Peer { /// Returns whether this peer's buffer is full and we should drop gossip messages. fn buffer_full_drop_gossip(&self) -> bool { - if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO { - return false - } - true + self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || + self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO } } From ab149dc9d51b4d560b7ef7d86c9718044b998264 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Thu, 25 Aug 2022 13:58:49 -0400 Subject: [PATCH 2/3] PeerMan: rename drop_gossip util to be more accurate It's more accurate to name it as dropping gossip broadcasts, as it won't drop all gossip. --- lightning/src/ln/peer_handler.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 925d7e5946d..08efffe9641 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -398,7 +398,7 @@ impl Peer { } /// Returns whether this peer's buffer is full and we should drop gossip messages. - fn buffer_full_drop_gossip(&self) -> bool { + fn buffer_full_drop_gossip_broadcast(&self) -> bool { self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO } @@ -1322,7 +1322,7 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.buffer_full_drop_gossip() { + if peer.buffer_full_drop_gossip_broadcast() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1346,7 +1346,7 @@ impl P !peer.should_forward_node_announcement(msg.contents.node_id) { continue } - if peer.buffer_full_drop_gossip() { + if peer.buffer_full_drop_gossip_broadcast() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1369,7 +1369,7 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.buffer_full_drop_gossip() { + if peer.buffer_full_drop_gossip_broadcast() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } From 47e818f198abafba01b9ad278582886f9007dac2 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Wed, 24 Aug 2022 18:04:58 -0400 Subject: [PATCH 3/3] Separate gossip broadcasts into their own queue in PeerManager This allows us to better prioritize channel messages over gossip broadcasts and lays groundwork for rate limiting onion messages more simply, since they won't be competing with gossip broadcasts for space in the main message queue. --- lightning/src/ln/peer_handler.rs | 54 ++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 08efffe9641..573e910ab08 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -337,6 +337,9 @@ struct Peer { pending_outbound_buffer: LinkedList>, pending_outbound_buffer_first_msg_offset: usize, + // Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize + // channel messages over them. + gossip_broadcast_buffer: LinkedList>, awaiting_write_event: bool, pending_read_buffer: Vec, @@ -389,17 +392,26 @@ impl Peer { self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE } - /// Determines if we should push additional gossip messages onto a peer's outbound buffer for - /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have - /// been drained. + /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's + /// outbound buffer. This is checked every time the peer's buffer may have been drained. fn should_buffer_gossip_backfill(&self) -> bool { - self.pending_outbound_buffer.is_empty() && - self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK + self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty() + && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK } - /// Returns whether this peer's buffer is full and we should drop gossip messages. + /// Determines if we should push additional gossip broadcast messages onto a peer's outbound + /// buffer. This is checked every time the peer's buffer may have been drained. + fn should_buffer_gossip_broadcast(&self) -> bool { + self.pending_outbound_buffer.is_empty() + && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK + } + + /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts. fn buffer_full_drop_gossip_broadcast(&self) -> bool { - self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || + let total_outbound_buffered = + self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len(); + + total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO } } @@ -668,6 +680,7 @@ impl P pending_outbound_buffer: LinkedList::new(), pending_outbound_buffer_first_msg_offset: 0, + gossip_broadcast_buffer: LinkedList::new(), awaiting_write_event: false, pending_read_buffer, @@ -714,6 +727,7 @@ impl P pending_outbound_buffer: LinkedList::new(), pending_outbound_buffer_first_msg_offset: 0, + gossip_broadcast_buffer: LinkedList::new(), awaiting_write_event: false, pending_read_buffer, @@ -734,6 +748,11 @@ impl P fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { while !peer.awaiting_write_event { + if peer.should_buffer_gossip_broadcast() { + if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() { + peer.pending_outbound_buffer.push_back(msg); + } + } if peer.should_buffer_gossip_backfill() { match peer.sync_status { InitSyncTracker::NoSyncRequested => {}, @@ -848,12 +867,6 @@ impl P } } - /// Append a message to a peer's pending outbound/write buffer - fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec) { - peer.msgs_sent_since_pong += 1; - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); - } - /// Append a message to a peer's pending outbound/write buffer fn enqueue_message(&self, peer: &mut Peer, message: &M) { let mut buffer = VecWriter(Vec::with_capacity(2048)); @@ -864,7 +877,14 @@ impl P } else { log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap())) } - self.enqueue_encoded_message(peer, &buffer.0); + peer.msgs_sent_since_pong += 1; + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&buffer.0[..])); + } + + /// Append a message to a peer's pending outbound/write gossip broadcast buffer + fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec) { + peer.msgs_sent_since_pong += 1; + peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); } fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result { @@ -1333,7 +1353,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(&mut *peer, &encoded_msg); + self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg); } }, wire::Message::NodeAnnouncement(ref msg) => { @@ -1356,7 +1376,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(&mut *peer, &encoded_msg); + self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg); } }, wire::Message::ChannelUpdate(ref msg) => { @@ -1376,7 +1396,7 @@ impl P if except_node.is_some() && peer.their_node_id.as_ref() == except_node { continue; } - self.enqueue_encoded_message(&mut *peer, &encoded_msg); + self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg); } }, _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),