Skip to content

Commit

Permalink
fix(gossipsub): Attempt to publish to at least mesh_n peers (#5578)
Browse files Browse the repository at this point in the history
## Description

With flood published disabled we've noticed that it can be the case that
we have connected peers on topics but these peers are not in our mesh
(perhaps due to their own mesh requirements). Currently, we fail to
publish the message if there are no peers in our mesh.

This PR adjusts this logic to always attempt to publish to at least
mesh_n peers. If we have peers that are subscribed to a topic, we will
now attempt to publish messages to them (provided they have the required
score).

This PR also simplies the peer and respective topics by moving the topic
list each peer has subscribed to `PeerConnections` and removing both
`peer_topics` and `topic_peers` from the main `Behaviour`.
Per commit review is suggested.

---------

Co-authored-by: Darius Clark <dariusc93@users.noreply.github.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 3, 2024
1 parent f0589c8 commit 93169cc
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 288 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ libp2p-core = { version = "0.42.0", path = "core" }
libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" }
libp2p-dns = { version = "0.42.0", path = "transports/dns" }
libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" }
libp2p-gossipsub = { version = "0.47.0", path = "protocols/gossipsub" }
libp2p-gossipsub = { version = "0.47.1", path = "protocols/gossipsub" }
libp2p-identify = { version = "0.45.0", path = "protocols/identify" }
libp2p-identity = { version = "0.2.9" }
libp2p-kad = { version = "0.47.0", path = "protocols/kad" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.47.1

- Attempt to publish to at least mesh_n peers when flood publish is disabled.
See [PR 5578](https://github.com/libp2p/rust-libp2p/pull/5578).

## 0.47.0

<!-- Update to libp2p-swarm v0.45.0 -->
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-gossipsub"
edition = "2021"
rust-version = { workspace = true }
description = "Gossipsub protocol for libp2p"
version = "0.47.0"
version = "0.47.1"
authors = ["Age Manning <Age@AgeManning.com>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
399 changes: 167 additions & 232 deletions protocols/gossipsub/src/behaviour.rs

Large diffs are not rendered by default.

101 changes: 54 additions & 47 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,9 @@ fn test_unsubscribe() {

for topic_hash in &topic_hashes {
assert!(
gs.topic_peers.contains_key(topic_hash),
gs.connected_peers
.values()
.any(|p| p.topics.contains(topic_hash)),
"Topic_peers contain a topic entry"
);
assert!(
Expand Down Expand Up @@ -629,8 +631,11 @@ fn test_publish_without_flood_publishing() {

// all peers should be subscribed to the topic
assert_eq!(
gs.topic_peers.get(&topic_hashes[0]).map(|p| p.len()),
Some(20),
gs.connected_peers
.values()
.filter(|p| p.topics.contains(&topic_hashes[0]))
.count(),
20,
"Peers should be subscribed to the topic"
);

Expand Down Expand Up @@ -669,8 +674,8 @@ fn test_publish_without_flood_publishing() {
let config: Config = Config::default();
assert_eq!(
publishes.len(),
config.mesh_n_low(),
"Should send a publish message to all known peers"
config.mesh_n(),
"Should send a publish message to at least mesh_n peers"
);

assert!(
Expand Down Expand Up @@ -809,9 +814,9 @@ fn test_inject_connected() {

// should add the new peers to `peer_topics` with an empty vec as a gossipsub node
for peer in peers {
let known_topics = gs.peer_topics.get(&peer).unwrap();
let peer = gs.connected_peers.get(&peer).unwrap();
assert!(
known_topics == &topic_hashes.iter().cloned().collect(),
peer.topics == topic_hashes.iter().cloned().collect(),
"The topics for each node should all topics"
);
}
Expand Down Expand Up @@ -860,24 +865,39 @@ fn test_handle_received_subscriptions() {

// verify the result

let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
let peer = gs.connected_peers.get(&peers[0]).unwrap();
assert!(
peer_topics == topic_hashes.iter().take(3).cloned().collect(),
peer.topics
== topic_hashes
.iter()
.take(3)
.cloned()
.collect::<BTreeSet<_>>(),
"First peer should be subscribed to three topics"
);
let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone();
let peer1 = gs.connected_peers.get(&peers[1]).unwrap();
assert!(
peer_topics == topic_hashes.iter().take(3).cloned().collect(),
peer1.topics
== topic_hashes
.iter()
.take(3)
.cloned()
.collect::<BTreeSet<_>>(),
"Second peer should be subscribed to three topics"
);

assert!(
!gs.peer_topics.contains_key(&unknown_peer),
!gs.connected_peers.contains_key(&unknown_peer),
"Unknown peer should not have been added"
);

for topic_hash in topic_hashes[..3].iter() {
let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone();
let topic_peers = gs
.connected_peers
.iter()
.filter(|(_, p)| p.topics.contains(topic_hash))
.map(|(peer_id, _)| *peer_id)
.collect::<BTreeSet<PeerId>>();
assert!(
topic_peers == peers[..2].iter().cloned().collect(),
"Two peers should be added to the first three topics"
Expand All @@ -894,13 +914,21 @@ fn test_handle_received_subscriptions() {
&peers[0],
);

let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone();
assert!(
peer_topics == topic_hashes[1..3].iter().cloned().collect(),
let peer = gs.connected_peers.get(&peers[0]).unwrap().clone();
assert_eq!(
peer.topics,
topic_hashes[1..3].iter().cloned().collect::<BTreeSet<_>>(),
"Peer should be subscribed to two topics"
);

let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment
// only gossipsub at the moment
let topic_peers = gs
.connected_peers
.iter()
.filter(|(_, p)| p.topics.contains(&topic_hashes[0]))
.map(|(peer_id, _)| *peer_id)
.collect::<BTreeSet<PeerId>>();

assert!(
topic_peers == peers[1..2].iter().cloned().collect(),
"Only the second peers should be in the first topic"
Expand All @@ -924,9 +952,8 @@ fn test_get_random_peers() {
for _ in 0..20 {
peers.push(PeerId::random())
}

gs.topic_peers
.insert(topic_hash.clone(), peers.iter().cloned().collect());
let mut topics = BTreeSet::new();
topics.insert(topic_hash.clone());

gs.connected_peers = peers
.iter()
Expand All @@ -936,52 +963,32 @@ fn test_get_random_peers() {
PeerConnections {
kind: PeerKind::Gossipsubv1_1,
connections: vec![ConnectionId::new_unchecked(0)],
topics: topics.clone(),
},
)
})
.collect();

let random_peers =
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| {
true
});
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| true);
assert_eq!(random_peers.len(), 5, "Expected 5 peers to be returned");
let random_peers = get_random_peers(
&gs.topic_peers,
&gs.connected_peers,
&topic_hash,
30,
|_| true,
);
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 30, |_| true);
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(
random_peers == peers.iter().cloned().collect(),
"Expected no shuffling"
);
let random_peers = get_random_peers(
&gs.topic_peers,
&gs.connected_peers,
&topic_hash,
20,
|_| true,
);
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 20, |_| true);
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(
random_peers == peers.iter().cloned().collect(),
"Expected no shuffling"
);
let random_peers =
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 0, |_| {
true
});
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 0, |_| true);
assert!(random_peers.is_empty(), "Expected 0 peers to be returned");
// test the filter
let random_peers =
get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| {
false
});
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 5, |_| false);
assert!(random_peers.is_empty(), "Expected 0 peers to be returned");
let random_peers = get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 10, {
let random_peers = get_random_peers(&gs.connected_peers, &topic_hash, 10, {
|peer| peers.contains(peer)
});
assert!(random_peers.len() == 10, "Expected 10 peers to be returned");
Expand Down
15 changes: 10 additions & 5 deletions protocols/gossipsub/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,17 @@ impl Metrics {
}
}

/// Register how many peers do we known are subscribed to this topic.
pub(crate) fn set_topic_peers(&mut self, topic: &TopicHash, count: usize) {
/// Increase the number of peers that are subscribed to this topic.
pub(crate) fn inc_topic_peers(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_peers_count
.get_or_create(topic)
.set(count as i64);
self.topic_peers_count.get_or_create(topic).inc();
}
}

/// Decrease the number of peers that are subscribed to this topic.
pub(crate) fn dec_topic_peers(&mut self, topic: &TopicHash) {
if self.register_topic(topic).is_ok() {
self.topic_peers_count.get_or_create(topic).dec();
}
}

Expand Down
4 changes: 3 additions & 1 deletion protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use libp2p_identity::PeerId;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::EncodeLabelValue;
use quick_protobuf::MessageWrite;
use std::fmt;
use std::fmt::Debug;
use std::{collections::BTreeSet, fmt};

use crate::rpc_proto::proto;
#[cfg(feature = "serde")]
Expand Down Expand Up @@ -77,6 +77,8 @@ pub(crate) struct PeerConnections {
pub(crate) kind: PeerKind,
/// Its current connections.
pub(crate) connections: Vec<ConnectionId>,
/// Subscribed topics.
pub(crate) topics: BTreeSet<TopicHash>,
}

/// Describes the types of peers that can exist in the gossipsub context.
Expand Down

0 comments on commit 93169cc

Please sign in to comment.