Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/gossipsub: Review #93

Merged
merged 4 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl BackoffStorage {
}
}

/// Updates the backoff for a peer (if there is already a more restrictive backoff than this call
/// Updates the backoff for a peer (if there is already a more restrictive backoff then this call
/// doesn't change anything).
pub fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) {
let instant = Instant::now() + time;
Expand Down
81 changes: 55 additions & 26 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,13 @@ impl MessageAuthenticity {
}
}

// mxinden: Wouldn't [`GossipsubEvent`] suffice? How is this /more/ generic than other structs with
// generic type parameters?
//
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
/// Event that can be emitted by the gossipsub behaviour.
#[derive(Debug)]
pub enum GenericGossipsubEvent<T: AsRef<[u8]>> {
/// A message has been received.
/// A message has been received.
Message {
/// The peer that forwarded us this message.
propagation_source: PeerId,
Expand Down Expand Up @@ -199,6 +202,8 @@ impl From<MessageAuthenticity> for PublishConfig {
type GossipsubNetworkBehaviourAction<T> =
NetworkBehaviourAction<Arc<rpc_proto::Rpc>, GenericGossipsubEvent<T>>;

// mxinden: Again, why prefix `Generic` here? Do we follow this pattern anywhere else?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the objects that contain the raw data now have generic versions (which are only exposed for advanced use cases), we expect the general use should not require these Generic types.

//
/// Network behaviour that handles the gossipsub protocol.
///
/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GenericGossipsubConfig`] instance. If message signing is
Expand Down Expand Up @@ -231,6 +236,9 @@ pub struct GenericGossipsub<T: AsRef<[u8]>, Filter: TopicSubscriptionFilter> {
/// A map of all connected peers to their subscribed topics.
peer_topics: HashMap<PeerId, BTreeSet<TopicHash>>,

// mxinden: What makes a peer explicit? That it always stays in the mesh and that the local node
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
// gossips all messages to it? Woudl you mind extending the docs here?
//
/// A set of all explicit peers.
explicit_peers: HashSet<PeerId>,

Expand Down Expand Up @@ -274,12 +282,16 @@ pub struct GenericGossipsub<T: AsRef<[u8]>, Filter: TopicSubscriptionFilter> {
/// promises.
peer_score: Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,

// mxinden: Would be nice to be consistent with the naming scheme of `count_iasked`.
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
//
/// Counts the number of `IHAVE` received from each peer since the last heartbeat.
count_peer_have: HashMap<PeerId, usize>,

/// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
count_iasked: HashMap<PeerId, usize>,

// mxinden: Do I understand that this is for anonymous or random author messages only? If so,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. A scoring parameter prevents messages being re-routed that we published. For random and annonymous author messages this is harder to determine and we need to keep track of our published messages to handle this.

There are triangle routing connections which are supposedly unlikely but still allows these kinds of messages to be sent back to us.

Will extend the comments

// can you add a comment?
/// short term cache for published messsage ids
published_message_ids: DuplicateCache<MessageId>,

Expand All @@ -289,6 +301,9 @@ pub struct GenericGossipsub<T: AsRef<[u8]>, Filter: TopicSubscriptionFilter> {
subscription_filter: Filter,
}

// mxinden: I am fine breaking backwards compatibility at such an early stage. Can you elaborate why
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As explained above, we expect most users to just use this type. More advanced users can adjust the internals.

Its more a convenience type to hide added complexities from the average user.

// this is so important?
//
// for backwards compatibility
pub type Gossipsub = GenericGossipsub<Vec<u8>, AllowAllSubscriptionFilter>;

Expand Down Expand Up @@ -405,7 +420,8 @@ where

/// Subscribe to a topic.
///
/// Returns true if the subscription worked. Returns false if we were already subscribed.
/// Returns [`Ok(true)`] if the subscription worked. Returns [`Ok(false)`] if we were already
/// subscribed.
pub fn subscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, SubscriptionError> {
debug!("Subscribing to topic: {}", topic);
let topic_hash = topic.hash();
Expand Down Expand Up @@ -449,7 +465,7 @@ where

/// Unsubscribes from a topic.
///
/// Returns true if we were subscribed to this topic.
/// Returns [`Ok(true)`] if we were subscribed to this topic.
pub fn unsubscribe<H: Hasher>(&mut self, topic: &Topic<H>) -> Result<bool, PublishError> {
debug!("Unsubscribing from topic: {}", topic);
let topic_hash = topic.hash();
Expand Down Expand Up @@ -516,6 +532,10 @@ where
// possible to have a message that exceeds the RPC limit and is not caught here. A
// warning log will be emitted in this case.
return Err(PublishError::MessageTooLarge);

// mxinden: In case the message is still too large, how about emitting an event
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a GossipsubEvent or simply adding a log?

The way we are currently using this is by calling the subscribe() or unsubscribe() functions and checking for failures at that level. The application then logs these warnings.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a GossipsubEvent or simply adding a log?

Yes, passing it back as a GossipsubEvent. Again, I haven't found the time to fully think this through, but from a first grasp this seems intuitive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this comes down to how we expect the application to handle this. I would have thought it more intuitive that the gossipsub.subscribe() function itself returns a Result and the application knows immediately there is an error and handles it then, rather than handling errors during the poll.

This saves the application from having some intermediate state between sending a subscription and knowing whether it was successful or not. I guess if we throw a GossipsubEvent on failure, we might want to throw the success case also?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused why you are referring to subscribe() and unsubscribe() here. How does this comment on publish() relate to the subscription or unsubscription paths?

// It is possible to have a message that exceeds the RPC limit and is not caught here. A warning log will be emitted in this case.

Do I understand correctly that this comment refers to the poll function in handler.rs, more concretely the handling of GossipsubHandlerError::MaxTransmissionSize when sending a message?

                Some(OutboundSubstreamState::PendingSend(mut substream, message)) => {
                    match Sink::poll_ready(Pin::new(&mut substream), cx) {
                        Poll::Ready(Ok(())) => {
                            match Sink::start_send(Pin::new(&mut substream), message) {
                                Ok(()) => {
                                    self.outbound_substream =
                                        Some(OutboundSubstreamState::PendingFlush(substream))
                                }
                                Err(GossipsubHandlerError::MaxTransmissionSize) => {
                                    error!("Message exceeded the maximum transmission size and was not sent.");
                                    self.outbound_substream =
                                        Some(OutboundSubstreamState::WaitingOutput(substream));
                                }
                                Err(e) => {
                                    error!("Error sending message: {}", e);
                                    return Poll::Ready(ProtocolsHandlerEvent::Close(e));
                                }
                            }
                        }
                        Poll::Ready(Err(e)) => {
                            error!("Outbound substream error while sending output: {:?}", e);
                            return Poll::Ready(ProtocolsHandlerEvent::Close(e));
                        }
                        Poll::Pending => {
                            self.outbound_substream =
                                Some(OutboundSubstreamState::PendingSend(substream, message));
                            break;
                        }
                    }
                }

My initial idea was to encapsulate the original oversized message in the GossipsubHandlerError::MaxTransmissionSize error, bubbling it up to the user. Thinking some more about it, this is likely confusing as (a) sending a oversized message to multiple peers would result in multiple such events for a single publish call and (b) as you elaborated in the comment on publish the limit can both be reached through messages and topics. With that in mind I would leave it as is with the error log as well as the early length check in publish.

// It is possible to have a message that exceeds the RPC limit and is not caught here.

How can the message to be send still in itself exceed the max_transmit_size even though, for the purpose of checking, it is being encoded as a protobuf message here to check the length? Or are you referring to something different here with message?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AgeManning pinging you here, not because it is urgent, but due to the comment being marked as outdated, and thus maybe getting lost.

// returning the message instead of silently dropping it? Haven't thought this fully
// through yet.
}

// Add published message to the duplicate cache.
Expand Down Expand Up @@ -631,21 +651,22 @@ where
Ok(msg_id)
}

/// This function should be called when `config.validate_messages()` is `true` after the
/// message got validated by the caller. Messages are stored in the
/// ['Memcache'] and validation is expected to be fast enough that the messages should still
/// exist in the cache. There are three possible validation outcomes and the outcome is given
/// in acceptance.
/// This function should be called when `config.validate_messages()` is `true` after the message
/// got validated by the caller. Messages are stored in the ['Memcache'] and validation is
/// expected to be fast enough that the messages should still exist in the cache. There are
/// three possible validation outcomes and the outcome is given in acceptance.
///
/// If acceptance = Accept the message will get propagated to the network. The
/// `propagation_source` parameter indicates who the message was received by and will not
/// be forwarded back to that peer.
/// If acceptance = [`MessageAcceptance::Accept`] the message will get propagated to the
/// network. The `propagation_source` parameter indicates who the message was received by and
/// will not be forwarded back to that peer.
///
/// If acceptance = Reject the message will be deleted from the memcache and the P₄ penalty
/// will be applied to the `propagation_source`.
/// If acceptance = [`MessageAcceptance::Reject`] the message will be deleted from the memcache
/// and the P₄ penalty will be applied to the `propagation_source`.
///
/// If acceptance = Ignore the message will be deleted from the memcache but no P₄ penalty
/// will be applied.
// mxinden: I must be missing something. Where is this penalty applied?
mxinden marked this conversation as resolved.
Show resolved Hide resolved
//
/// If acceptance = [`MessageAcceptance::Ignore`] the message will be deleted from the memcache
/// but no P₄ penalty will be applied.
///
/// This function will return true if the message was found in the cache and false if was not
/// in the cache anymore.
Expand Down Expand Up @@ -712,7 +733,7 @@ where
}
}

/// Removes a blacklisted peer if it has previously been blacklisted.
/// Removes a peer from the blacklist if it has previously been blacklisted.
pub fn remove_blacklisted_peer(&mut self, peer_id: &PeerId) {
if self.blacklisted_peers.remove(peer_id) {
debug!("Peer has been removed from the blacklist: {}", peer_id);
Expand Down Expand Up @@ -1327,7 +1348,7 @@ where
if below_threshold {
debug!(
"PRUNE: ignoring PX from peer {:?} with insufficient score \
[score ={} topic = {}]",
[score ={} topic = {}]",
peer_id, score, topic_hash
);
continue;
Expand All @@ -1351,7 +1372,9 @@ where
fn px_connect(&mut self, mut px: Vec<PeerInfo>) {
let n = self.config.prune_peers();
// Ignore peerInfo with no ID
//TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a signed peer record?
//
//TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a
// signed peer record?
px = px.into_iter().filter(|p| p.peer_id.is_some()).collect();
if px.len() > n {
// only use at most prune_peers many random peers
Expand All @@ -1361,8 +1384,8 @@ where
}

for p in px {
// TODO: Once signed records are spec'd: extract signed peer record if given and handle it, see
// https://github.com/libp2p/specs/pull/217
// TODO: Once signed records are spec'd: extract signed peer record if given and handle
// it, see https://github.com/libp2p/specs/pull/217
if let Some(peer_id) = p.peer_id {
// mark as px peer
self.px_peers.insert(peer_id.clone());
Expand Down Expand Up @@ -1453,7 +1476,8 @@ where
true
}

/// Handles a newly received GossipsubMessage.
/// Handles a newly received [`RawGossipsubMessage`].
///
/// Forwards the message to all peers in the mesh.
fn handle_received_message(&mut self, msg: RawGossipsubMessage, propagation_source: &PeerId) {
let fast_message_id = self.config.fast_message_id(&msg);
Expand All @@ -1477,7 +1501,7 @@ where

// Add the message to the duplicate caches and memcache.
if let Some(fast_message_id) = fast_message_id {
//add id to cache
// add id to cache
self.fast_messsage_id_cache
.entry(fast_message_id)
.or_insert_with(|| msg.message_id().clone());
Expand All @@ -1497,8 +1521,8 @@ where
msg.message_id()
);

// Tells score that message arrived (but is maybe not fully validated yet)
// Consider message as delivered for gossip promises
// Tells score that message arrived (but is maybe not fully validated yet).
// Consider message as delivered for gossip promises.
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
peer_score.validate_message(propagation_source, &msg);
gossip_promises.message_delivered(msg.message_id());
Expand Down Expand Up @@ -1770,7 +1794,7 @@ where
if score(p) < 0.0 {
debug!(
"HEARTBEAT: Prune peer {:?} with negative score [score = {}, topic = \
{}]",
{}]",
p,
score(p),
topic_hash
Expand Down Expand Up @@ -2240,6 +2264,7 @@ where
}

/// Helper function which forwards a message to mesh\[topic\] peers.
///
/// Returns true if at least one peer was messaged.
fn forward_msg(
&mut self,
Expand Down Expand Up @@ -2303,7 +2328,7 @@ where
}
}

/// Constructs a `GenericGossipsubMessage` performing message signing if required.
/// Constructs a [`GenericGossipsubMessage`] performing message signing if required.
pub(crate) fn build_message(
&self,
topic: TopicHash,
Expand Down Expand Up @@ -2629,6 +2654,7 @@ where
}
}

// mxinden: You can call `ConnectedPoint::get_remote_address` directly.
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
fn get_remote_addr(endpoint: &ConnectedPoint) -> &Multiaddr {
match endpoint {
ConnectedPoint::Dialer { address } => address,
Expand Down Expand Up @@ -2804,6 +2830,9 @@ where

// Add the IP to the peer scoring system
if let Some((peer_score, ..)) = &mut self.peer_score {
// mxinden: The connection might run via a relay (see relay circuit spec). In that case
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I think this is a point of contention in the 1.1 specs.

There is a scoring configuration parameter that allows you to adjust the number of peers allowed under a single IP before the scoring kicks in. Also you can easily turn this off with the scoring parameters. See the ip_colocation_factor parameters in peer_score/params.rs.

Ultimately its an optional scoring param that a user can decide to use or not (and suggested by the specs) and I guess depending on the use of this, is up to the user to include or not.

// all nodes using the same relay would have the same IP address. Is scoring based on
// the IP address a good idea in such scenario?
if let Some(ip) = get_ip_addr(get_remote_addr(endpoint)) {
peer_score.add_ip(&peer_id, ip);
} else {
Expand Down
7 changes: 4 additions & 3 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

// collection of tests for the gossipsub network behaviour

// mxinden: Impressive test coverage!

mod tests {
use byteorder::{BigEndian, ByteOrder};
use std::thread::sleep;
Expand Down Expand Up @@ -1648,8 +1650,8 @@ mod tests {
);
}

#[test]
// Tests the mesh maintenance addition
#[test]
fn test_mesh_addition() {
let config = GossipsubConfig::default();

Expand Down Expand Up @@ -1682,8 +1684,8 @@ mod tests {
assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), config.mesh_n());
}

#[test]
// Tests the mesh maintenance subtraction
#[test]
fn test_mesh_subtraction() {
let config = GossipsubConfig::default();

Expand Down Expand Up @@ -4890,7 +4892,6 @@ mod tests {
address as *mut Pointers
}};
}

#[derive(Clone, Default)]
struct MessageData(pub Vec<u8>);

Expand Down
Loading