Skip to content

Commit

Permalink
protocols/gossipsub: Review (#93)
Browse files Browse the repository at this point in the history
* protocols/gossipsub: Review

* Addressing most of the review comments

* Address reviewers comments

* Appease CI

Co-authored-by: blacktemplar <blacktemplar@a1.net>
Co-authored-by: Age Manning <Age@AgeManning.com>
  • Loading branch information
3 people authored Dec 7, 2020
1 parent 5157e70 commit 82be02d
Show file tree
Hide file tree
Showing 19 changed files with 394 additions and 318 deletions.
146 changes: 96 additions & 50 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{Transport, transport::{TransportError, ListenerEvent}};
use crate::{
transport::{ListenerEvent, TransportError},
Transport,
};
use fnv::FnvHashMap;
use futures::{future::{self, Ready}, prelude::*, channel::mpsc, task::Context, task::Poll};
use futures::{
channel::mpsc,
future::{self, Ready},
prelude::*,
task::Context,
task::Poll,
};
use lazy_static::lazy_static;
use multiaddr::{Protocol, Multiaddr};
use multiaddr::{Multiaddr, Protocol};
use parking_lot::Mutex;
use rw_stream_sink::RwStreamSink;
use std::{collections::hash_map::Entry, error, fmt, io, num::NonZeroU64, pin::Pin};
Expand Down Expand Up @@ -66,7 +75,7 @@ impl Hub {
let (tx, rx) = mpsc::channel(2);
match hub.entry(port) {
Entry::Occupied(_) => return None,
Entry::Vacant(e) => e.insert(tx)
Entry::Vacant(e) => e.insert(tx),
};

Some((rx, port))
Expand Down Expand Up @@ -103,7 +112,8 @@ impl DialFuture {
fn new(port: NonZeroU64) -> Option<Self> {
let sender = HUB.get(&port)?.clone();

let (_dial_port_channel, dial_port) = HUB.register_port(0)
let (_dial_port_channel, dial_port) = HUB
.register_port(0)
.expect("there to be some random unoccupied port.");

let (a_tx, a_rx) = mpsc::channel(4096);
Expand All @@ -129,23 +139,26 @@ impl Future for DialFuture {
type Output = Result<Channel<Vec<u8>>, MemoryTransportError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

match self.sender.poll_ready(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(())) => {},
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(_)) => return Poll::Ready(Err(MemoryTransportError::Unreachable)),
}

let channel_to_send = self.channel_to_send.take()
let channel_to_send = self
.channel_to_send
.take()
.expect("Future should not be polled again once complete");
let dial_port = self.dial_port;
match self.sender.start_send((channel_to_send, dial_port)) {
Err(_) => return Poll::Ready(Err(MemoryTransportError::Unreachable)),
Ok(()) => {}
}

Poll::Ready(Ok(self.channel_to_return.take()
.expect("Future should not be polled again once complete")))
Poll::Ready(Ok(self
.channel_to_return
.take()
.expect("Future should not be polled again once complete")))
}
}

Expand All @@ -172,7 +185,7 @@ impl Transport for MemoryTransport {
port,
addr: Protocol::Memory(port.get()).into(),
receiver: rx,
tell_listen_addr: true
tell_listen_addr: true,
};

Ok(listener)
Expand Down Expand Up @@ -222,16 +235,19 @@ pub struct Listener {
/// Receives incoming connections.
receiver: ChannelReceiver,
/// Generate `ListenerEvent::NewAddress` to inform about our listen address.
tell_listen_addr: bool
tell_listen_addr: bool,
}

impl Stream for Listener {
type Item = Result<ListenerEvent<Ready<Result<Channel<Vec<u8>>, MemoryTransportError>>, MemoryTransportError>, MemoryTransportError>;
type Item = Result<
ListenerEvent<Ready<Result<Channel<Vec<u8>>, MemoryTransportError>>, MemoryTransportError>,
MemoryTransportError,
>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.tell_listen_addr {
self.tell_listen_addr = false;
return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(self.addr.clone()))))
return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(self.addr.clone()))));
}

let (channel, dial_port) = match Stream::poll_next(Pin::new(&mut self.receiver), cx) {
Expand All @@ -243,7 +259,7 @@ impl Stream for Listener {
let event = ListenerEvent::Upgrade {
upgrade: future::ready(Ok(channel)),
local_addr: self.addr.clone(),
remote_addr: Protocol::Memory(dial_port.get()).into()
remote_addr: Protocol::Memory(dial_port.get()).into(),
};

Poll::Ready(Some(Ok(event)))
Expand Down Expand Up @@ -295,8 +311,7 @@ pub struct Chan<T = Vec<u8>> {
dial_port: Option<NonZeroU64>,
}

impl<T> Unpin for Chan<T> {
}
impl<T> Unpin for Chan<T> {}

impl<T> Stream for Chan<T> {
type Item = Result<T, io::Error>;
Expand All @@ -314,12 +329,15 @@ impl<T> Sink<T> for Chan<T> {
type Error = io::Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.outgoing.poll_ready(cx)
self.outgoing
.poll_ready(cx)
.map(|v| v.map_err(|_| io::ErrorKind::BrokenPipe.into()))
}

fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.outgoing.start_send(item).map_err(|_| io::ErrorKind::BrokenPipe.into())
self.outgoing
.start_send(item)
.map_err(|_| io::ErrorKind::BrokenPipe.into())
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand Down Expand Up @@ -355,30 +373,59 @@ mod tests {
assert_eq!(parse_memory_addr(&"/memory/5".parse().unwrap()), Ok(5));
assert_eq!(parse_memory_addr(&"/tcp/150".parse().unwrap()), Err(()));
assert_eq!(parse_memory_addr(&"/memory/0".parse().unwrap()), Ok(0));
assert_eq!(parse_memory_addr(&"/memory/5/tcp/150".parse().unwrap()), Err(()));
assert_eq!(parse_memory_addr(&"/tcp/150/memory/5".parse().unwrap()), Err(()));
assert_eq!(parse_memory_addr(&"/memory/1234567890".parse().unwrap()), Ok(1_234_567_890));
assert_eq!(
parse_memory_addr(&"/memory/5/tcp/150".parse().unwrap()),
Err(())
);
assert_eq!(
parse_memory_addr(&"/tcp/150/memory/5".parse().unwrap()),
Err(())
);
assert_eq!(
parse_memory_addr(&"/memory/1234567890".parse().unwrap()),
Ok(1_234_567_890)
);
}

#[test]
fn listening_twice() {
let transport = MemoryTransport::default();
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok());
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok());
let _listener = transport.listen_on("/memory/1639174018481".parse().unwrap()).unwrap();
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_err());
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_err());
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_ok());
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_ok());
let _listener = transport
.listen_on("/memory/1639174018481".parse().unwrap())
.unwrap();
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_err());
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_err());
drop(_listener);
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok());
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok());
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_ok());
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_ok());
}

#[test]
fn port_not_in_use() {
let transport = MemoryTransport::default();
assert!(transport.dial("/memory/810172461024613".parse().unwrap()).is_err());
let _listener = transport.listen_on("/memory/810172461024613".parse().unwrap()).unwrap();
assert!(transport.dial("/memory/810172461024613".parse().unwrap()).is_ok());
assert!(transport
.dial("/memory/810172461024613".parse().unwrap())
.is_err());
let _listener = transport
.listen_on("/memory/810172461024613".parse().unwrap())
.unwrap();
assert!(transport
.dial("/memory/810172461024613".parse().unwrap())
.is_ok());
}

#[test]
Expand All @@ -396,9 +443,11 @@ mod tests {
let listener = async move {
let listener = t1.listen_on(t1_addr.clone()).unwrap();

let upgrade = listener.filter_map(|ev| futures::future::ready(
ListenerEvent::into_upgrade(ev.unwrap())
)).next().await.unwrap();
let upgrade = listener
.filter_map(|ev| futures::future::ready(ListenerEvent::into_upgrade(ev.unwrap())))
.next()
.await
.unwrap();

let mut socket = upgrade.0.await.unwrap();

Expand All @@ -423,16 +472,14 @@ mod tests {

#[test]
fn dialer_address_unequal_to_listener_address() {
let listener_addr: Multiaddr = Protocol::Memory(
rand::random::<u64>().saturating_add(1),
).into();
let listener_addr: Multiaddr =
Protocol::Memory(rand::random::<u64>().saturating_add(1)).into();
let listener_addr_cloned = listener_addr.clone();

let listener_transport = MemoryTransport::default();

let listener = async move {
let mut listener = listener_transport.listen_on(listener_addr.clone())
.unwrap();
let mut listener = listener_transport.listen_on(listener_addr.clone()).unwrap();
while let Some(ev) = listener.next().await {
if let ListenerEvent::Upgrade { remote_addr, .. } = ev.unwrap() {
assert!(
Expand All @@ -445,7 +492,8 @@ mod tests {
};

let dialer = async move {
MemoryTransport::default().dial(listener_addr_cloned)
MemoryTransport::default()
.dial(listener_addr_cloned)
.unwrap()
.await
.unwrap();
Expand All @@ -459,21 +507,18 @@ mod tests {
let (terminate, should_terminate) = futures::channel::oneshot::channel();
let (terminated, is_terminated) = futures::channel::oneshot::channel();

let listener_addr: Multiaddr = Protocol::Memory(
rand::random::<u64>().saturating_add(1),
).into();
let listener_addr: Multiaddr =
Protocol::Memory(rand::random::<u64>().saturating_add(1)).into();
let listener_addr_cloned = listener_addr.clone();

let listener_transport = MemoryTransport::default();

let listener = async move {
let mut listener = listener_transport.listen_on(listener_addr.clone())
.unwrap();
let mut listener = listener_transport.listen_on(listener_addr.clone()).unwrap();
while let Some(ev) = listener.next().await {
if let ListenerEvent::Upgrade { remote_addr, .. } = ev.unwrap() {
let dialer_port = NonZeroU64::new(
parse_memory_addr(&remote_addr).unwrap(),
).unwrap();
let dialer_port =
NonZeroU64::new(parse_memory_addr(&remote_addr).unwrap()).unwrap();

assert!(
HUB.get(&dialer_port).is_some(),
Expand All @@ -494,7 +539,8 @@ mod tests {
};

let dialer = async move {
let _chan = MemoryTransport::default().dial(listener_addr_cloned)
let _chan = MemoryTransport::default()
.dial(listener_addr_cloned)
.unwrap()
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/gossipsub-chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fn main() -> Result<(), Box<dyn Error>> {
};

// Set a custom gossipsub
let gossipsub_config = gossipsub::GossipsubConfigBuilder::new()
let gossipsub_config = gossipsub::GossipsubConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space
.validation_mode(ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ fn main() -> Result<(), Box<dyn Error>> {

// Create a Swarm to manage peers and events
let mut swarm = {
let gossipsub_config = GossipsubConfigBuilder::new()
let gossipsub_config = GossipsubConfigBuilder::default()
.max_transmit_size(262144)
.build()
.expect("valid config");
Expand Down
2 changes: 2 additions & 0 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ async-std = "1.6.3"
env_logger = "0.8.1"
libp2p-plaintext = { path = "../plaintext" }
libp2p-yamux = { path = "../../muxers/yamux" }
libp2p-mplex = { path = "../../muxers/mplex" }
libp2p-noise = { path = "../../protocols/noise" }
quickcheck = "0.9.2"
hex = "0.4.2"
derive_builder = "0.9.0"
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl BackoffStorage {
}
}

/// Updates the backoff for a peer (if there is already a more restrictive backoff than this call
/// Updates the backoff for a peer (if there is already a more restrictive backoff then this call
/// doesn't change anything).
pub fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) {
let instant = Instant::now() + time;
Expand Down
Loading

0 comments on commit 82be02d

Please sign in to comment.