diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 4593dec912a..1e9e43f5872 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -253,9 +253,6 @@ pub struct Behaviour { /// Events that need to be yielded to the outside when polling. events: VecDeque>, - /// Pools non-urgent control messages between heartbeats. - control_pool: HashMap>, - /// Information used for publishing messages. publish_config: PublishConfig, @@ -323,10 +320,6 @@ pub struct Behaviour { /// Counts the number of `IWANT` that we sent the each peer since the last heartbeat. count_sent_iwant: HashMap, - /// Keeps track of IWANT messages that we are awaiting to send. - /// This is used to prevent sending duplicate IWANT messages for the same message. - pending_iwant_msgs: HashSet, - /// Short term cache for published message ids. This is used for penalizing peers sending /// our own messages back if the messages are anonymous or use a random author. published_message_ids: DuplicateCache, @@ -451,7 +444,6 @@ where Ok(Behaviour { metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)), events: VecDeque::new(), - control_pool: HashMap::new(), publish_config: privacy.into(), duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()), topic_peers: HashMap::new(), @@ -1290,7 +1282,6 @@ where iwant_ids_vec.truncate(iask); *iasked += iask; - if let Some((_, _, _, gossip_promises)) = &mut self.peer_score { gossip_promises.add_promise( *peer_id, @@ -2438,9 +2429,6 @@ where self.send_graft_prune(to_graft, to_prune, no_px); } - // piggyback pooled control messages - self.flush_control_pool(); - // shift the memcache self.mcache.shift(); @@ -2779,33 +2767,6 @@ where } } - // adds a control action to control_pool - fn control_pool_add( - control_pool: &mut HashMap>, - peer: PeerId, - control: RpcOut, - ) { - control_pool.entry(peer).or_default().push(control); - } - - /// Takes each control action mapping and turns it into a message - fn flush_control_pool(&mut self) { - for (peer, controls) in self.control_pool.drain().collect::>() { - for msg in controls { - let sender = self - .handler_send_queues - .get_mut(&peer) - .expect("Peerid should exist"); - - match msg { - RpcOut::IHave(ihave) => sender.ihave(ihave), - _ => unreachable!(), - } - } - } - - } - fn on_connection_established( &mut self, ConnectionEstablished { @@ -3446,7 +3407,6 @@ impl fmt::Debug for Behaviour( - gs: &Behaviour, +fn count_control_msgs( queues: &HashMap, mut filter: impl FnMut(&PeerId, &RpcOut) -> bool, ) -> usize { - gs.control_pool + queues .iter() - .map(|(peer_id, actions)| actions.iter().filter(|m| filter(peer_id, m)).count()) - .sum::() - + queues - .iter() - .fold(0, |mut collected_messages, (peer_id, c)| { - while !c.priority.is_empty() || !c.non_priority.is_empty() { - if let Ok(rpc) = c.priority.try_recv() { - if filter(peer_id, &rpc) { - collected_messages += 1; - } + .fold(0, |mut collected_messages, (peer_id, c)| { + while !c.priority.is_empty() || !c.non_priority.is_empty() { + if let Ok(rpc) = c.priority.try_recv() { + if filter(peer_id, &rpc) { + collected_messages += 1; } - if let Ok(rpc) = c.non_priority.try_recv() { - if filter(peer_id, &rpc) { - collected_messages += 1; - } + } + if let Ok(rpc) = c.non_priority.try_recv() { + if filter(peer_id, &rpc) { + collected_messages += 1; } } - collected_messages - }) + } + collected_messages + }) } fn flush_events( gs: &mut Behaviour, receiver_queues: &HashMap, ) { - gs.control_pool.clear(); gs.events.clear(); for c in receiver_queues.values() { while !c.priority.is_empty() || !c.non_priority.is_empty() { @@ -1478,7 +1472,7 @@ fn test_handle_graft_explicit_peer() { //check prunes assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == peer + count_control_msgs(&queues, |peer_id, m| peer_id == peer && match m { RpcOut::Prune(Prune { topic_hash, .. }) => topic_hash == &topic_hashes[0] || topic_hash == &topic_hashes[1], @@ -1507,7 +1501,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" @@ -1515,7 +1509,7 @@ fn explicit_peers_not_added_to_mesh_on_receiving_subscription() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1539,7 +1533,7 @@ fn do_not_graft_explicit_peer() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &others[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &others[0] && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1613,7 +1607,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. })) > 0, "No graft message got created to non-explicit peer" @@ -1621,7 +1615,7 @@ fn explicit_peers_not_added_to_mesh_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1662,7 +1656,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that graft gets created to non-explicit peer assert!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] && matches!(m, RpcOut::Graft { .. })) >= 1, "No graft message got created to non-explicit peer" @@ -1670,7 +1664,7 @@ fn explicit_peers_not_added_to_mesh_from_fanout_on_subscribe() { //assert that no graft gets created to explicit peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && matches!(m, RpcOut::Graft { .. })), 0, "A graft message got created to an explicit peer" @@ -1857,7 +1851,7 @@ fn test_send_px_and_backoff_in_prune() { //check prune message assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && match m { RpcOut::Prune(Prune { topic_hash, @@ -1905,7 +1899,7 @@ fn test_prune_backoffed_peer_on_graft() { //check prune message assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && match m { RpcOut::Prune(Prune { topic_hash, @@ -1955,7 +1949,7 @@ fn test_do_not_graft_within_backoff_period() { //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -1966,7 +1960,7 @@ fn test_do_not_graft_within_backoff_period() { //check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2004,7 +1998,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2015,7 +2009,7 @@ fn test_do_not_graft_within_default_backoff_period_after_receiving_prune_without //check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2044,7 +2038,7 @@ fn test_unsubscribe_backoff() { let _ = gs.unsubscribe(&Topic::new(topic)); assert_eq!( - count_control_msgs(&gs, &queues, |_, m| match m { + count_control_msgs(&queues, |_, m| match m { RpcOut::Prune(Prune { backoff, .. }) => backoff == &Some(1), _ => false, }), @@ -2069,7 +2063,7 @@ fn test_unsubscribe_backoff() { // Check that no graft got created (we have backoff_slack = 1 therefore one more heartbeat // is needed). assert_eq!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })), + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })), 0, "Graft message created too early within backoff period" ); @@ -2080,7 +2074,7 @@ fn test_unsubscribe_backoff() { // check that graft got created assert!( - count_control_msgs(&gs, &queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, + count_control_msgs(&queues, |_, m| matches!(m, RpcOut::Graft { .. })) > 0, "No graft message was created after backoff period" ); } @@ -2173,7 +2167,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, &queues, |_, action| match action { + count_control_msgs(&queues, |_, action| match action { RpcOut::IHave(IHave { topic_hash, message_ids, @@ -2217,7 +2211,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { let msg_id = gs.config.message_id(message); //check that exactly config.gossip_lazy() many gossip messages were sent. assert_eq!( - count_control_msgs(&gs, &queues, |_, action| match action { + count_control_msgs(&queues, |_, action| match action { RpcOut::IHave(IHave { topic_hash, message_ids, @@ -2376,7 +2370,7 @@ fn test_prune_negative_scored_peers() { //check prune message assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[0] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[0] && match m { RpcOut::Prune(Prune { topic_hash, @@ -2510,7 +2504,7 @@ fn test_only_send_nonnegative_scoring_peers_in_px() { // Check that px in prune message only contains third peer assert_eq!( - count_control_msgs(&gs, &queues, |peer_id, m| peer_id == &peers[1] + count_control_msgs(&queues, |peer_id, m| peer_id == &peers[1] && match m { RpcOut::Prune(Prune { topic_hash, @@ -2585,7 +2579,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Check that exactly one gossip messages got sent and it got sent to p2 assert_eq!( - count_control_msgs(&gs, &receivers, |peer, action| match action { + count_control_msgs(&receivers, |peer, action| match action { RpcOut::IHave(IHave { topic_hash, message_ids, @@ -2751,7 +2745,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { // check that we sent exactly one IWANT request to p2 assert_eq!( - count_control_msgs(&gs, &queues, |peer, c| match c { + count_control_msgs(&queues, |peer, c| match c { RpcOut::IWant(IWant { message_ids }) => if message_ids.iter().any(|m| m == &msg_id) { assert_eq!(peer, &p2); @@ -4345,7 +4339,7 @@ fn test_ignore_graft_from_unknown_topic() { //assert that no prune got created assert_eq!( - count_control_msgs(&gs, &queues, |_, a| matches!(a, RpcOut::Prune { .. })), + count_control_msgs(&queues, |_, a| matches!(a, RpcOut::Prune { .. })), 0, "we should not prune after graft in unknown topic" ); @@ -4444,7 +4438,7 @@ fn test_ignore_too_many_ihaves() { //we send iwant only for the first 10 messages assert_eq!( - count_control_msgs(&gs, &receivers, |p, action| p == &peer + count_control_msgs(&receivers, |p, action| p == &peer && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1 && first_ten.contains(&message_ids[0]))), 10, "exactly the first ten ihaves should be processed and one iwant for each created" @@ -4468,7 +4462,7 @@ fn test_ignore_too_many_ihaves() { //we sent iwant for all 10 messages assert_eq!( - count_control_msgs(&gs, &receivers, |p, action| p == &peer + count_control_msgs(&receivers, |p, action| p == &peer && matches!(action, RpcOut::IWant(IWant { message_ids }) if message_ids.len() == 1)), 10, "all 20 should get sent" @@ -4518,7 +4512,7 @@ fn test_ignore_too_many_messages_in_ihave() { //we send iwant only for the first 10 messages let mut sum = 0; assert_eq!( - count_control_msgs(&gs, &queues, |p, rpc| match rpc { + count_control_msgs(&queues, |p, rpc| match rpc { RpcOut::IWant(IWant { message_ids }) => { p == &peer && { assert!(first_twelve.is_superset(&message_ids.iter().collect())); @@ -4544,7 +4538,7 @@ fn test_ignore_too_many_messages_in_ihave() { //we sent 10 iwant messages ids via a IWANT rpc. let mut sum = 0; assert_eq!( - count_control_msgs(&gs, &queues, |p, rpc| { + count_control_msgs(&queues, |p, rpc| { match rpc { RpcOut::IWant(IWant { message_ids }) => { p == &peer && { @@ -4603,7 +4597,7 @@ fn test_limit_number_of_message_ids_inside_ihave() { let mut ihaves2 = HashSet::new(); assert_eq!( - count_control_msgs(&gs, &receivers, |p, action| match action { + count_control_msgs(&receivers, |p, action| match action { RpcOut::IHave(IHave { message_ids, .. }) => { if p == &p1 { ihaves1 = message_ids.iter().cloned().collect(); @@ -4938,7 +4932,7 @@ fn test_dont_send_px_to_old_gossipsub_peers() { //check that prune does not contain px assert_eq!( - count_control_msgs(&gs, &queues, |_, m| match m { + count_control_msgs(&queues, |_, m| match m { RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }), @@ -4976,7 +4970,7 @@ fn test_dont_send_floodsub_peers_in_px() { //check that px in prune message is empty assert_eq!( - count_control_msgs(&gs, &queues, |_, m| match m { + count_control_msgs(&queues, |_, m| match m { RpcOut::Prune(Prune { peers: px, .. }) => !px.is_empty(), _ => false, }),