Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement adaptive gossip dissemination + complete config debug output #40

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::cmp::max;
use std::collections::hash_map::Entry;
use std::iter::FromIterator;
use std::time::Duration;
Expand Down Expand Up @@ -1278,13 +1279,18 @@ impl Gossipsub {
return;
}

// dynamic number of peers to gossip based on `gossip_factor` with minimum `gossip_lazy`
let n_map = |m| {
max(
self.config.gossip_lazy,
(self.config.gossip_factor * m as f64) as usize,
)
};
// get gossip_lazy random peers
let to_msg_peers = Self::get_random_peers(
&self.topic_peers,
&topic_hash,
self.config.gossip_lazy,
|peer| !peers.contains(peer) && !self.explicit_peers.contains(peer),
);
let to_msg_peers =
Self::get_random_peers_dynamic(&self.topic_peers, &topic_hash, n_map, |peer| {
!peers.contains(peer) && !self.explicit_peers.contains(peer)
});

debug!("Gossiping IHAVE to {} peers.", to_msg_peers.len());

Expand Down Expand Up @@ -1496,12 +1502,14 @@ impl Gossipsub {
}
}

/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
/// filtered by the function `f`.
fn get_random_peers(
/// Helper function to get a subset of random gossipsub peers for a `topic_hash`
/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
/// that gets as input the number of filtered peers.
fn get_random_peers_dynamic(
topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
topic_hash: &TopicHash,
n: usize,
// maps the number of total peers to the number of selected peers
n_map: impl Fn(usize) -> usize,
mut f: impl FnMut(&PeerId) -> bool,
) -> BTreeSet<PeerId> {
let mut gossip_peers = match topic_peers.get(topic_hash) {
Expand All @@ -1511,6 +1519,7 @@ impl Gossipsub {
};

// if we have less than needed, return them
let n = n_map(gossip_peers.len());
if gossip_peers.len() <= n {
debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len());
return gossip_peers.into_iter().collect();
Expand All @@ -1525,6 +1534,17 @@ impl Gossipsub {
gossip_peers.into_iter().take(n).collect()
}

/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
/// filtered by the function `f`.
fn get_random_peers(
topic_peers: &HashMap<TopicHash, BTreeSet<PeerId>>,
topic_hash: &TopicHash,
n: usize,
f: impl FnMut(&PeerId) -> bool,
) -> BTreeSet<PeerId> {
Self::get_random_peers_dynamic(topic_peers, topic_hash, |_| n, f)
}

// adds a control action to control_pool
fn control_pool_add(
control_pool: &mut HashMap<PeerId, Vec<GossipsubControlAction>>,
Expand Down
78 changes: 78 additions & 0 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1634,4 +1634,82 @@ mod tests {
"Message cache should contain published message"
);
}

#[test]
fn test_gossip_to_at_least_gossip_lazy_peers() {
let config = GossipsubConfig::default();

//add more peers than in mesh to test gossipping
//by default only mesh_n_low peers will get added to mesh
let (mut gs, _, topic_hashes) = build_and_inject_nodes(
config.mesh_n_low + config.gossip_lazy + 1,
vec!["topic".into()],
true,
);

//receive message
let message = GossipsubMessage {
source: Some(PeerId::random()),
data: vec![],
sequence_number: Some(0),
topics: vec![topic_hashes[0].clone()],
signature: None,
key: None,
validated: true,
};
gs.handle_received_message(message.clone(), &PeerId::random());

//emit gossip
gs.emit_gossip();

//check that exactly config.gossip_lazy many gossip messages were sent.
let msg_id = (gs.config.message_id_fn)(&message);
assert_eq!(
count_control_msgs(&gs, |peer, action| match action {
GossipsubControlAction::IHave {
topic_hash,
message_ids,
} => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id),
_ => false,
}),
config.gossip_lazy
);
}

#[test]
fn test_gossip_to_at_most_gossip_factor_peers() {
let config = GossipsubConfig::default();

//add a lot of peers
let m = config.mesh_n_low + config.gossip_lazy * (2.0 / config.gossip_factor) as usize;
let (mut gs, _, topic_hashes) = build_and_inject_nodes(m, vec!["topic".into()], true);

//receive message
let message = GossipsubMessage {
source: Some(PeerId::random()),
data: vec![],
sequence_number: Some(0),
topics: vec![topic_hashes[0].clone()],
signature: None,
key: None,
validated: true,
};
gs.handle_received_message(message.clone(), &PeerId::random());

//emit gossip
gs.emit_gossip();

//check that exactly config.gossip_lazy many gossip messages were sent.
let msg_id = (gs.config.message_id_fn)(&message);
assert_eq!(
count_control_msgs(&gs, |peer, action| match action {
GossipsubControlAction::IHave {
topic_hash,
message_ids,
} => topic_hash == &topic_hashes[0] && message_ids.iter().any(|id| id == &msg_id),
_ => false,
}),
((m - config.mesh_n_low) as f64 * config.gossip_factor) as usize
);
}
}
27 changes: 25 additions & 2 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,15 @@ pub struct GossipsubConfig {
/// is 12).
pub mesh_n_high: usize,

/// Number of peers to emit gossip to during a heartbeat (D_lazy in the spec, default is 6).
/// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec,
/// default is 6).
pub gossip_lazy: usize,

/// Affects how many peers we will emit gossip to at each heartbeat.
/// We will send gossip to `gossip_factor * (total number of non-mesh peers)`, or
/// `gossip_lazy`, whichever is greater. The default is 0.25.
pub gossip_factor: f64,

/// Initial delay in each heartbeat (default is 5 seconds).
pub heartbeat_initial_delay: Duration,

Expand Down Expand Up @@ -162,6 +168,7 @@ impl Default for GossipsubConfig {
mesh_n_low: 5,
mesh_n_high: 12,
gossip_lazy: 6, // default to mesh_n
gossip_factor: 0.25,
heartbeat_initial_delay: Duration::from_secs(5),
heartbeat_interval: Duration::from_secs(1),
fanout_ttl: Duration::from_secs(60),
Expand Down Expand Up @@ -270,12 +277,21 @@ impl GossipsubConfigBuilder {
self
}

/// Number of peers to emit gossip to during a heartbeat (D_lazy in the spec, default is 6).
/// Minimum number of peers to emit gossip to during a heartbeat (D_lazy in the spec,
/// default is 6).
pub fn gossip_lazy(&mut self, gossip_lazy: usize) -> &mut Self {
self.config.gossip_lazy = gossip_lazy;
self
}

/// Affects how many peers we will emit gossip to at each heartbeat.
/// We will send gossip to `gossip_factor * (total number of non-mesh peers)`, or
/// `gossip_lazy`, whichever is greater. The default is 0.25.
pub fn gossip_factor(&mut self, gossip_factor: f64) -> &mut Self {
self.config.gossip_factor = gossip_factor;
self
}

/// Initial delay in each heartbeat (default is 5 seconds).
pub fn heartbeat_initial_delay(&mut self, heartbeat_initial_delay: Duration) -> &mut Self {
self.config.heartbeat_initial_delay = heartbeat_initial_delay;
Expand Down Expand Up @@ -408,12 +424,19 @@ impl std::fmt::Debug for GossipsubConfig {
let _ = builder.field("mesh_n_low", &self.mesh_n_low);
let _ = builder.field("mesh_n_high", &self.mesh_n_high);
let _ = builder.field("gossip_lazy", &self.gossip_lazy);
let _ = builder.field("gossip_factor", &self.gossip_factor);
let _ = builder.field("heartbeat_initial_delay", &self.heartbeat_initial_delay);
let _ = builder.field("heartbeat_interval", &self.heartbeat_interval);
let _ = builder.field("fanout_ttl", &self.fanout_ttl);
let _ = builder.field("max_transmit_size", &self.max_transmit_size);
let _ = builder.field("duplicate_cache_time", &self.duplicate_cache_time);
let _ = builder.field("validate_messages", &self.validate_messages);
let _ = builder.field("validation_mode", &self.validation_mode);
let _ = builder.field("do_px", &self.do_px);
let _ = builder.field("prune_peers", &self.prune_peers);
let _ = builder.field("prune_backoff", &self.prune_backoff);
let _ = builder.field("backoff_slack", &self.backoff_slack);
let _ = builder.field("flood_publish", &self.flood_publish);
builder.finish()
}
}
Expand Down