Skip to content

Commit

Permalink
Address reviewers comments (#88)
Browse files Browse the repository at this point in the history
* Address reviewers comments

* fix rust doc linking error

* Add copy trait

Co-authored-by: blacktemplar <blacktemplar@a1.net>
  • Loading branch information
AgeManning and blacktemplar authored Nov 8, 2020
1 parent 7a316e8 commit d306455
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 41 deletions.
4 changes: 2 additions & 2 deletions examples/gossipsub-chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
//! cargo run --example gossipsub-chat
//! ```
//!
//! It will print the `PeerId` and the listening address, e.g. `Listening on
//! It will print the [`PeerId`] and the listening address, e.g. `Listening on
//! "/ip4/0.0.0.0/tcp/24915"`
//!
//! In the second terminal window, start a new instance of the example with:
Expand All @@ -51,7 +51,7 @@ use env_logger::{Builder, Env};
use futures::prelude::*;
use libp2p::gossipsub::MessageId;
use libp2p::gossipsub::{
GossipsubEvent, RawGossipsubMessage, IdentTopic as Topic, MessageAuthenticity, ValidationMode,
GossipsubEvent, IdentTopic as Topic, MessageAuthenticity, RawGossipsubMessage, ValidationMode,
};
use libp2p::{gossipsub, identity, PeerId};
use std::collections::hash_map::DefaultHasher;
Expand Down
30 changes: 17 additions & 13 deletions protocols/gossipsub/src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Data structure for efficiently storing known back-off's when Pruning peers.
//! Data structure for efficiently storing known back-off's when pruning peers.
use crate::topic::TopicHash;
use libp2p_core::PeerId;
use std::collections::{
Expand All @@ -28,15 +28,18 @@ use std::collections::{
use std::time::Duration;
use wasm_timer::Instant;

#[derive(Copy, Clone)]
struct HeartbeatIndex(usize);

/// Stores backoffs in an efficient manner.
pub struct BackoffStorage {
/// Stores backoffs and the index in backoffs_by_heartbeat per peer per topic.
backoffs: HashMap<TopicHash, HashMap<PeerId, (Instant, usize)>>,
backoffs: HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
/// Stores peer topic pairs per heartbeat (this is cyclic the current index is
/// heartbeat_index).
backoffs_by_heartbeat: Vec<HashSet<(TopicHash, PeerId)>>,
/// The index in the backoffs_by_heartbeat vector corresponding to the current heartbeat.
heartbeat_index: usize,
heartbeat_index: HeartbeatIndex,
/// The heartbeat interval duration from the config.
heartbeat_interval: Duration,
/// Backoff slack from the config.
Expand All @@ -60,7 +63,7 @@ impl BackoffStorage {
BackoffStorage {
backoffs: HashMap::new(),
backoffs_by_heartbeat: vec![HashSet::new(); max_heartbeats],
heartbeat_index: 0,
heartbeat_index: HeartbeatIndex(0),
heartbeat_interval,
backoff_slack,
}
Expand All @@ -71,17 +74,17 @@ impl BackoffStorage {
pub fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) {
let instant = Instant::now() + time;
let insert_into_backoffs_by_heartbeat =
|heartbeat_index: usize,
|heartbeat_index: HeartbeatIndex,
backoffs_by_heartbeat: &mut Vec<HashSet<_>>,
heartbeat_interval,
backoff_slack| {
let pair = (topic.clone(), peer.clone());
let index = (heartbeat_index
let index = (heartbeat_index.0
+ Self::heartbeats(&time, heartbeat_interval)
+ backoff_slack as usize)
% backoffs_by_heartbeat.len();
backoffs_by_heartbeat[index].insert(pair);
index
HeartbeatIndex(index)
};
match self
.backoffs
Expand All @@ -90,10 +93,10 @@ impl BackoffStorage {
.entry(peer.clone())
{
Entry::Occupied(mut o) => {
let &(backoff, index) = o.get();
if backoff < instant {
let (backoff, index) = o.get();
if backoff < &instant {
let pair = (topic.clone(), peer.clone());
if let Some(s) = self.backoffs_by_heartbeat.get_mut(index) {
if let Some(s) = self.backoffs_by_heartbeat.get_mut(index.0) {
s.remove(&pair);
}
let index = insert_into_backoffs_by_heartbeat(
Expand Down Expand Up @@ -135,7 +138,7 @@ impl BackoffStorage {
}

fn get_backoff_time_from_backoffs(
backoffs: &HashMap<TopicHash, HashMap<PeerId, (Instant, usize)>>,
backoffs: &HashMap<TopicHash, HashMap<PeerId, (Instant, HeartbeatIndex)>>,
topic: &TopicHash,
peer: &PeerId,
) -> Option<Instant> {
Expand All @@ -148,7 +151,7 @@ impl BackoffStorage {
/// `heartbeat_interval`.
pub fn heartbeat(&mut self) {
// Clean up backoffs_by_heartbeat
if let Some(s) = self.backoffs_by_heartbeat.get_mut(self.heartbeat_index) {
if let Some(s) = self.backoffs_by_heartbeat.get_mut(self.heartbeat_index.0) {
let backoffs = &mut self.backoffs;
let slack = self.heartbeat_interval * self.backoff_slack;
let now = Instant::now();
Expand All @@ -171,6 +174,7 @@ impl BackoffStorage {
}

// Increase heartbeat index
self.heartbeat_index = (self.heartbeat_index + 1) % self.backoffs_by_heartbeat.len();
self.heartbeat_index =
HeartbeatIndex((self.heartbeat_index.0 + 1) % self.backoffs_by_heartbeat.len());
}
}
22 changes: 11 additions & 11 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use crate::{rpc_proto, TopicScoreParams};
use std::cmp::Ordering::Equal;
use std::fmt::Debug;

#[cfg(test)]
mod tests;

/// Determines if published messages should be signed or not.
Expand Down Expand Up @@ -113,7 +114,7 @@ impl MessageAuthenticity {
}
}

/// Event that can happen on the gossipsub behaviour.
/// Event that can be emitted by the gossipsub behaviour.
#[derive(Debug)]
pub enum GenericGossipsubEvent<T: AsRef<[u8]>> {
/// A message has been received.
Expand All @@ -133,7 +134,6 @@ pub enum GenericGossipsubEvent<T: AsRef<[u8]>> {
/// The topic it has subscribed to.
topic: TopicHash,
},

/// A remote unsubscribed from a topic.
Unsubscribed {
/// Remote that has unsubscribed.
Expand Down Expand Up @@ -219,7 +219,7 @@ pub struct GenericGossipsub<T: AsRef<[u8]>, Filter: TopicSubscriptionFilter> {

/// An LRU Time cache for storing seen messages (based on their ID). This cache prevents
/// duplicates from being propagated to the application and on the network.
duplication_cache: DuplicateCache<MessageId>,
duplicate_cache: DuplicateCache<MessageId>,

/// A map of peers to their protocol kind. This is to identify different kinds of gossipsub
/// peers.
Expand Down Expand Up @@ -256,7 +256,7 @@ pub struct GenericGossipsub<T: AsRef<[u8]>, Filter: TopicSubscriptionFilter> {
/// Heartbeat interval stream.
heartbeat: Interval,

/// number of heartbeats since the beginning of time; this allows us to amortize some resource
/// Number of heartbeats since the beginning of time; this allows us to amortize some resource
/// clean up -- eg backoff clean up.
heartbeat_ticks: u64,

Expand Down Expand Up @@ -329,7 +329,7 @@ where
events: VecDeque::new(),
control_pool: HashMap::new(),
publish_config: privacy.into(),
duplication_cache: DuplicateCache::new(config.duplicate_cache_time()),
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
fast_messsage_id_cache: TimeCache::new(config.duplicate_cache_time()),
topic_peers: HashMap::new(),
peer_topics: HashMap::new(),
Expand Down Expand Up @@ -519,7 +519,7 @@ where
}

// Add published message to the duplicate cache.
if !self.duplication_cache.insert(msg_id.clone()) {
if !self.duplicate_cache.insert(msg_id.clone()) {
// This message has already been seen. We don't re-publish messages that have already
// been published on the network.
warn!(
Expand Down Expand Up @@ -1028,7 +1028,7 @@ where
}

for id in ids {
if !self.duplication_cache.contains(&id) {
if !self.duplicate_cache.contains(&id) {
// have not seen this message, request it
iwant_ids.insert(id);
}
Expand Down Expand Up @@ -1475,14 +1475,14 @@ where
return;
}

// Add the message to the duplication caches and memcache.
// Add the message to the duplicate caches and memcache.
if let Some(fast_message_id) = fast_message_id {
//add id to cache
self.fast_messsage_id_cache
.entry(fast_message_id)
.or_insert_with(|| msg.message_id().clone());
}
if !self.duplication_cache.insert(msg.message_id().clone()) {
if !self.duplicate_cache.insert(msg.message_id().clone()) {
debug!(
"Message already received, ignoring. Message: {}",
msg.message_id()
Expand All @@ -1493,15 +1493,15 @@ where
return;
}
debug!(
"Put message {:?} in duplication_cache and resolve promises",
"Put message {:?} in duplicate_cache and resolve promises",
msg.message_id()
);

// Tells score that message arrived (but is maybe not fully validated yet)
// Consider message as delivered for gossip promises
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
peer_score.validate_message(propagation_source, &msg);
gossip_promises.deliver_message(msg.message_id());
gossip_promises.message_delivered(msg.message_id());
}

// Add the message to our memcache
Expand Down
1 change: 0 additions & 1 deletion protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

// collection of tests for the gossipsub network behaviour

#[cfg(test)]
mod tests {
use byteorder::{BigEndian, ByteOrder};
use std::thread::sleep;
Expand Down
18 changes: 8 additions & 10 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,8 @@ pub struct GenericGossipsubConfig<T> {

/// When set to `true`, prevents automatic forwarding of all received messages. This setting
/// allows a user to validate the messages before propagating them to their peers. If set to
/// true, the user must manually call `validate_message()` on the behaviour to forward message
/// once validated (default is `false`). Furthermore, the application may optionally call
/// `invalidate_message()` on the behaviour to remove the message from the memcache. The
/// default is false.
/// true, the user must manually call [crate::GenericGossipsub::report_message_validation_result()] on the behaviour to forward message
/// once validated (default is `false`).
validate_messages: bool,

/// Determines the level of validation used when receiving messages. See [`ValidationMode`]
Expand Down Expand Up @@ -180,9 +178,9 @@ pub struct GenericGossipsubConfig<T> {
/// get not punished for too early grafting. The default is 1.
backoff_slack: u32,

/// Whether to do flood publishing or not. If enabled newly created messages will always be
/// sent to all peers that are subscribed to the topic and have a good enough score.
/// The default is true.
/// Whether to do flood publishing or not. If enabled newly created messages authored by the
/// local node will always be sent to all peers that are subscribed to the topic and have a
/// good enough score. The default is true.
flood_publish: bool,

// If a GRAFT comes before `graft_flood_threshold` has elapsed since the last PRUNE,
Expand Down Expand Up @@ -233,11 +231,11 @@ pub struct GenericGossipsubConfig<T> {
published_message_ids_cache_time: Duration,
}

// for backwards compatibility
// For backwards compatibility
pub type GossipsubConfig = GenericGossipsubConfig<Vec<u8>>;

impl<T> GenericGossipsubConfig<T> {
//all the getters
// All the getters

/// The protocol id prefix to negotiate this protocol. The protocol id is of the form
/// `/<prefix>/<supported-versions>`. As gossipsub supports version 1.0 and 1.1, there are two
Expand Down Expand Up @@ -375,7 +373,7 @@ impl<T> GenericGossipsubConfig<T> {
.map(|fast_message_id_fn| fast_message_id_fn(message))
}

/// By default, gossipsub will reject messages that are sent to us that has the same message
/// By default, gossipsub will reject messages that are sent to us that have the same message
/// source as we have specified locally. Enabling this, allows these messages and prevents
/// penalizing the peer that sent us the message. Default is false.
pub fn allow_self_origin(&self) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions protocols/gossipsub/src/gossip_promises.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ impl GossipPromises {
}
}

pub fn deliver_message(&mut self, message_id: &MessageId) {
// Someone delivered a message, we can stop tracking all promises for it
pub fn message_delivered(&mut self, message_id: &MessageId) {
// Someone delivered a message, we can stop tracking all promises for it.
self.promises.remove(message_id);
}

Expand Down
4 changes: 2 additions & 2 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ pub struct GossipsubHandler {
peer_kind_sent: bool,

/// If the peer doesn't support the gossipsub protocol we do not immediately disconnect.
/// Rather, we disable the handler and prevent any ingoing or outgoing substreams from being
/// Rather, we disable the handler and prevent any incoming or outgoing substreams from being
/// established.
/// This value is set to true to indicate the peer doesn't not support gossipsub.
/// This value is set to true to indicate the peer doesn't support gossipsub.
protocol_unsupported: bool,

/// Collection of errors from attempting an upgrade.
Expand Down

0 comments on commit d306455

Please sign in to comment.