diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 645a7434e45..1f00fcb3ca0 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -562,8 +562,8 @@ mod tests { fn handle_node_announcement(&self, _msg: &NodeAnnouncement) -> Result { Ok(false) } fn handle_channel_announcement(&self, _msg: &ChannelAnnouncement) -> Result { Ok(false) } fn handle_channel_update(&self, _msg: &ChannelUpdate) -> Result { Ok(false) } - fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { Vec::new() } - fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } + fn get_next_channel_announcement(&self, _starting_point: u64) -> Option<(ChannelAnnouncement, Option, Option)> { None } + fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option { None } fn peer_connected(&self, _their_node_id: &PublicKey, _init_msg: &Init) { } fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 54d199a26f8..aa2ad2177c4 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -318,20 +318,20 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { ); let mut chan_progress = 0; loop { - let orig_announcements = self.gossip_sync.get_next_channel_announcements(chan_progress, 255); - let deserialized_announcements = gossip_sync.get_next_channel_announcements(chan_progress, 255); + let orig_announcements = self.gossip_sync.get_next_channel_announcement(chan_progress); + let deserialized_announcements = gossip_sync.get_next_channel_announcement(chan_progress); assert!(orig_announcements == deserialized_announcements); - chan_progress = match orig_announcements.last() { + chan_progress = match orig_announcements { Some(announcement) => announcement.0.contents.short_channel_id + 1, None => break, }; } let mut node_progress = None; loop { - let orig_announcements = self.gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255); - let deserialized_announcements = gossip_sync.get_next_node_announcements(node_progress.as_ref(), 255); + let orig_announcements = self.gossip_sync.get_next_node_announcement(node_progress.as_ref()); + let deserialized_announcements = gossip_sync.get_next_node_announcement(node_progress.as_ref()); assert!(orig_announcements == deserialized_announcements); - node_progress = match orig_announcements.last() { + node_progress = match orig_announcements { Some(announcement) => Some(announcement.contents.node_id), None => break, }; diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index 05a336c18ef..975b40c9c21 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -915,15 +915,15 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider { /// Handle an incoming channel_update message, returning true if it should be forwarded on, /// false or returning an Err otherwise. fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result; - /// Gets a subset of the channel announcements and updates required to dump our routing table - /// to a remote node, starting at the short_channel_id indicated by starting_point and - /// including the batch_amount entries immediately higher in numerical value than starting_point. - fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)>; - /// Gets a subset of the node announcements required to dump our routing table to a remote node, - /// starting at the node *after* the provided publickey and including batch_amount entries - /// immediately higher (as defined by ::cmp) than starting_point. + /// Gets channel announcements and updates required to dump our routing table to a remote node, + /// starting at the short_channel_id indicated by starting_point and including announcements + /// for a single channel. + fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option, Option)>; + /// Gets a node announcement required to dump our routing table to a remote node, starting at + /// the node *after* the provided pubkey and including up to one announcement immediately + /// higher (as defined by ::cmp) than starting_point. /// If None is provided for starting_point, we start at the first node. - fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec; + fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option; /// Called when a connection is established with a peer. This can be used to /// perform routing table synchronization using a strategy defined by the /// implementor. diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index d34fdfeb52a..623aa969dfd 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -67,9 +67,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result { Ok(false) } fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result { Ok(false) } fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result { Ok(false) } - fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> - Vec<(msgs::ChannelAnnouncement, Option, Option)> { Vec::new() } - fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } + fn get_next_channel_announcement(&self, _starting_point: u64) -> + Option<(msgs::ChannelAnnouncement, Option, Option)> { None } + fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option { None } fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {} fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } @@ -323,6 +323,10 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4; /// tick. Once we have sent this many messages since the last ping, we send a ping right away to /// ensures we don't just fill up our send buffer and leave the peer with too many messages to /// process before the next ping. +/// +/// Note that we continue responding to other messages even after we've sent this many messages, so +/// it's more of a general guideline used for gossip backfill (and gossip forwarding, times +/// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit. const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; struct Peer { @@ -378,6 +382,29 @@ impl Peer { InitSyncTracker::NodesSyncing(pk) => pk < node_id, } } + + /// Returns whether we should be reading bytes from this peer, based on whether its outbound + /// buffer still has space and we don't need to pause reads to get some writes out. + fn should_read(&self) -> bool { + self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE + } + + /// Determines if we should push additional gossip messages onto a peer's outbound buffer for + /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have + /// been drained. + fn should_buffer_gossip_backfill(&self) -> bool { + self.pending_outbound_buffer.is_empty() && + self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK + } + + /// Returns whether this peer's buffer is full and we should drop gossip messages. + fn buffer_full_drop_gossip(&self) -> bool { + if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO { + return false + } + true + } } /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. @@ -710,46 +737,39 @@ impl P fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { while !peer.awaiting_write_event { - if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK { + if peer.should_buffer_gossip_backfill() { match peer.sync_status { InitSyncTracker::NoSyncRequested => {}, InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => { - let steps = ((OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len() + 2) / 3) as u8; - let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps); - for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() { - self.enqueue_message(peer, announce); - if let &Some(ref update_a) = update_a_option { - self.enqueue_message(peer, update_a); + if let Some((announce, update_a_option, update_b_option)) = + self.message_handler.route_handler.get_next_channel_announcement(c) + { + self.enqueue_message(peer, &announce); + if let Some(update_a) = update_a_option { + self.enqueue_message(peer, &update_a); } - if let &Some(ref update_b) = update_b_option { - self.enqueue_message(peer, update_b); + if let Some(update_b) = update_b_option { + self.enqueue_message(peer, &update_b); } peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1); - } - if all_messages.is_empty() || all_messages.len() != steps as usize { + } else { peer.sync_status = InitSyncTracker::ChannelsSyncing(0xffff_ffff_ffff_ffff); } }, InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => { - let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8; - let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps); - for msg in all_messages.iter() { - self.enqueue_message(peer, msg); + if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(None) { + self.enqueue_message(peer, &msg); peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id); - } - if all_messages.is_empty() || all_messages.len() != steps as usize { + } else { peer.sync_status = InitSyncTracker::NoSyncRequested; } }, InitSyncTracker::ChannelsSyncing(_) => unreachable!(), InitSyncTracker::NodesSyncing(key) => { - let steps = (OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer.pending_outbound_buffer.len()) as u8; - let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps); - for msg in all_messages.iter() { - self.enqueue_message(peer, msg); + if let Some(msg) = self.message_handler.route_handler.get_next_node_announcement(Some(&key)) { + self.enqueue_message(peer, &msg); peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id); - } - if all_messages.is_empty() || all_messages.len() != steps as usize { + } else { peer.sync_status = InitSyncTracker::NoSyncRequested; } }, @@ -759,18 +779,15 @@ impl P self.maybe_send_extra_ping(peer); } - if { - let next_buff = match peer.pending_outbound_buffer.front() { - None => return, - Some(buff) => buff, - }; + let next_buff = match peer.pending_outbound_buffer.front() { + None => return, + Some(buff) => buff, + }; - let should_be_reading = peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE; - let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; - let data_sent = descriptor.send_data(pending, should_be_reading); - peer.pending_outbound_buffer_first_msg_offset += data_sent; - if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false } - } { + let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; + let data_sent = descriptor.send_data(pending, peer.should_read()); + peer.pending_outbound_buffer_first_msg_offset += data_sent; + if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { peer.pending_outbound_buffer_first_msg_offset = 0; peer.pending_outbound_buffer.pop_front(); } else { @@ -1045,7 +1062,7 @@ impl P } } } - pause_read = peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_READ_PAUSE; + pause_read = !peer.should_read(); if let Some(message) = msg_to_handle { match self.handle_message(&peer_mutex, peer_lock, message) { @@ -1308,9 +1325,7 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1334,9 +1349,7 @@ impl P !peer.should_forward_node_announcement(msg.contents.node_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -1359,9 +1372,7 @@ impl P !peer.should_forward_channel_announcement(msg.contents.short_channel_id) { continue } - if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO - { + if peer.buffer_full_drop_gossip() { log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id); continue; } @@ -2060,10 +2071,10 @@ mod tests { // Check that each peer has received the expected number of channel updates and channel // announcements. - assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100); - assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50); - assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100); - assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50); + assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108); + assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54); + assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 108); + assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 54); } #[test] diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index 1757d137300..925cd4cd4d3 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -41,7 +41,7 @@ use core::{cmp, fmt}; use sync::{RwLock, RwLockReadGuard}; use core::sync::atomic::{AtomicUsize, Ordering}; use sync::Mutex; -use core::ops::Deref; +use core::ops::{Bound, Deref}; use bitcoin::hashes::hex::ToHex; #[cfg(feature = "std")] @@ -318,56 +318,43 @@ where C::Target: chain::Access, L::Target: Logger Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY) } - fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { - let mut result = Vec::with_capacity(batch_amount as usize); + fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option, Option)> { let channels = self.network_graph.channels.read().unwrap(); - let mut iter = channels.range(starting_point..); - while result.len() < batch_amount as usize { - if let Some((_, ref chan)) = iter.next() { - if chan.announcement_message.is_some() { - let chan_announcement = chan.announcement_message.clone().unwrap(); - let mut one_to_two_announcement: Option = None; - let mut two_to_one_announcement: Option = None; - if let Some(one_to_two) = chan.one_to_two.as_ref() { - one_to_two_announcement = one_to_two.last_update_message.clone(); - } - if let Some(two_to_one) = chan.two_to_one.as_ref() { - two_to_one_announcement = two_to_one.last_update_message.clone(); - } - result.push((chan_announcement, one_to_two_announcement, two_to_one_announcement)); - } else { - // TODO: We may end up sending un-announced channel_updates if we are sending - // initial sync data while receiving announce/updates for this channel. + for (_, ref chan) in channels.range(starting_point..) { + if chan.announcement_message.is_some() { + let chan_announcement = chan.announcement_message.clone().unwrap(); + let mut one_to_two_announcement: Option = None; + let mut two_to_one_announcement: Option = None; + if let Some(one_to_two) = chan.one_to_two.as_ref() { + one_to_two_announcement = one_to_two.last_update_message.clone(); } + if let Some(two_to_one) = chan.two_to_one.as_ref() { + two_to_one_announcement = two_to_one.last_update_message.clone(); + } + return Some((chan_announcement, one_to_two_announcement, two_to_one_announcement)); } else { - return result; + // TODO: We may end up sending un-announced channel_updates if we are sending + // initial sync data while receiving announce/updates for this channel. } } - result + None } - fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec { - let mut result = Vec::with_capacity(batch_amount as usize); + fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option { let nodes = self.network_graph.nodes.read().unwrap(); - let mut iter = if let Some(pubkey) = starting_point { - let mut iter = nodes.range(NodeId::from_pubkey(pubkey)..); - iter.next(); - iter + let iter = if let Some(pubkey) = starting_point { + nodes.range((Bound::Excluded(NodeId::from_pubkey(pubkey)), Bound::Unbounded)) } else { - nodes.range::(..) + nodes.range(..) }; - while result.len() < batch_amount as usize { - if let Some((_, ref node)) = iter.next() { - if let Some(node_info) = node.announcement_info.as_ref() { - if node_info.announcement_message.is_some() { - result.push(node_info.announcement_message.clone().unwrap()); - } + for (_, ref node) in iter { + if let Some(node_info) = node.announcement_info.as_ref() { + if let Some(msg) = node_info.announcement_message.clone() { + return Some(msg); } - } else { - return result; } } - result + None } /// Initiates a stateless sync of routing gossip information with a peer @@ -2412,8 +2399,8 @@ mod tests { let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); // Channels were not announced yet. - let channels_with_announcements = gossip_sync.get_next_channel_announcements(0, 1); - assert_eq!(channels_with_announcements.len(), 0); + let channels_with_announcements = gossip_sync.get_next_channel_announcement(0); + assert!(channels_with_announcements.is_none()); let short_channel_id; { @@ -2427,17 +2414,15 @@ mod tests { } // Contains initial channel announcement now. - let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1); - assert_eq!(channels_with_announcements.len(), 1); - if let Some(channel_announcements) = channels_with_announcements.first() { - let &(_, ref update_1, ref update_2) = channel_announcements; + let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id); + if let Some(channel_announcements) = channels_with_announcements { + let (_, ref update_1, ref update_2) = channel_announcements; assert_eq!(update_1, &None); assert_eq!(update_2, &None); } else { panic!(); } - { // Valid channel update let valid_channel_update = get_signed_channel_update(|unsigned_channel_update| { @@ -2450,10 +2435,9 @@ mod tests { } // Now contains an initial announcement and an update. - let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1); - assert_eq!(channels_with_announcements.len(), 1); - if let Some(channel_announcements) = channels_with_announcements.first() { - let &(_, ref update_1, ref update_2) = channel_announcements; + let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id); + if let Some(channel_announcements) = channels_with_announcements { + let (_, ref update_1, ref update_2) = channel_announcements; assert_ne!(update_1, &None); assert_eq!(update_2, &None); } else { @@ -2473,10 +2457,9 @@ mod tests { } // Test that announcements with excess data won't be returned - let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id, 1); - assert_eq!(channels_with_announcements.len(), 1); - if let Some(channel_announcements) = channels_with_announcements.first() { - let &(_, ref update_1, ref update_2) = channel_announcements; + let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id); + if let Some(channel_announcements) = channels_with_announcements { + let (_, ref update_1, ref update_2) = channel_announcements; assert_eq!(update_1, &None); assert_eq!(update_2, &None); } else { @@ -2484,8 +2467,8 @@ mod tests { } // Further starting point have no channels after it - let channels_with_announcements = gossip_sync.get_next_channel_announcements(short_channel_id + 1000, 1); - assert_eq!(channels_with_announcements.len(), 0); + let channels_with_announcements = gossip_sync.get_next_channel_announcement(short_channel_id + 1000); + assert!(channels_with_announcements.is_none()); } #[test] @@ -2497,8 +2480,8 @@ mod tests { let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_1_privkey); // No nodes yet. - let next_announcements = gossip_sync.get_next_node_announcements(None, 10); - assert_eq!(next_announcements.len(), 0); + let next_announcements = gossip_sync.get_next_node_announcement(None); + assert!(next_announcements.is_none()); { // Announce a channel to add 2 nodes @@ -2509,10 +2492,9 @@ mod tests { }; } - // Nodes were never announced - let next_announcements = gossip_sync.get_next_node_announcements(None, 3); - assert_eq!(next_announcements.len(), 0); + let next_announcements = gossip_sync.get_next_node_announcement(None); + assert!(next_announcements.is_none()); { let valid_announcement = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx); @@ -2528,12 +2510,12 @@ mod tests { }; } - let next_announcements = gossip_sync.get_next_node_announcements(None, 3); - assert_eq!(next_announcements.len(), 2); + let next_announcements = gossip_sync.get_next_node_announcement(None); + assert!(next_announcements.is_some()); // Skip the first node. - let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2); - assert_eq!(next_announcements.len(), 1); + let next_announcements = gossip_sync.get_next_node_announcement(Some(&node_id_1)); + assert!(next_announcements.is_some()); { // Later announcement which should not be relayed (excess data) prevent us from sharing a node @@ -2547,8 +2529,8 @@ mod tests { }; } - let next_announcements = gossip_sync.get_next_node_announcements(Some(&node_id_1), 2); - assert_eq!(next_announcements.len(), 0); + let next_announcements = gossip_sync.get_next_node_announcement(Some(&node_id_1)); + assert!(next_announcements.is_none()); } #[test] diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 2f80fb73a41..fe009cc16b8 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -45,7 +45,7 @@ use prelude::*; use core::time::Duration; use sync::{Mutex, Arc}; use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use core::{cmp, mem}; +use core::mem; use bitcoin::bech32::u5; use chain::keysinterface::{InMemorySigner, Recipient, KeyMaterial}; @@ -445,23 +445,16 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel); Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError }) } - fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option, Option)> { - let mut chan_anns = Vec::new(); - const TOTAL_UPDS: u64 = 50; - let end: u64 = cmp::min(starting_point + batch_amount as u64, TOTAL_UPDS); - for i in starting_point..end { - let chan_upd_1 = get_dummy_channel_update(i); - let chan_upd_2 = get_dummy_channel_update(i); - let chan_ann = get_dummy_channel_announcement(i); + fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option, Option)> { + let chan_upd_1 = get_dummy_channel_update(starting_point); + let chan_upd_2 = get_dummy_channel_update(starting_point); + let chan_ann = get_dummy_channel_announcement(starting_point); - chan_anns.push((chan_ann, Some(chan_upd_1), Some(chan_upd_2))); - } - - chan_anns + Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2))) } - fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { - Vec::new() + fn get_next_node_announcement(&self, _starting_point: Option<&PublicKey>) -> Option { + None } fn peer_connected(&self, their_node_id: &PublicKey, init_msg: &msgs::Init) {