Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/gossipsub: Review #93

Merged
merged 4 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 96 additions & 50 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{Transport, transport::{TransportError, ListenerEvent}};
use crate::{
transport::{ListenerEvent, TransportError},
Transport,
};
use fnv::FnvHashMap;
use futures::{future::{self, Ready}, prelude::*, channel::mpsc, task::Context, task::Poll};
use futures::{
channel::mpsc,
future::{self, Ready},
prelude::*,
task::Context,
task::Poll,
};
use lazy_static::lazy_static;
use multiaddr::{Protocol, Multiaddr};
use multiaddr::{Multiaddr, Protocol};
use parking_lot::Mutex;
use rw_stream_sink::RwStreamSink;
use std::{collections::hash_map::Entry, error, fmt, io, num::NonZeroU64, pin::Pin};
Expand Down Expand Up @@ -66,7 +75,7 @@ impl Hub {
let (tx, rx) = mpsc::channel(2);
match hub.entry(port) {
Entry::Occupied(_) => return None,
Entry::Vacant(e) => e.insert(tx)
Entry::Vacant(e) => e.insert(tx),
};

Some((rx, port))
Expand Down Expand Up @@ -103,7 +112,8 @@ impl DialFuture {
fn new(port: NonZeroU64) -> Option<Self> {
let sender = HUB.get(&port)?.clone();

let (_dial_port_channel, dial_port) = HUB.register_port(0)
let (_dial_port_channel, dial_port) = HUB
.register_port(0)
.expect("there to be some random unoccupied port.");

let (a_tx, a_rx) = mpsc::channel(4096);
Expand All @@ -129,23 +139,26 @@ impl Future for DialFuture {
type Output = Result<Channel<Vec<u8>>, MemoryTransportError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

match self.sender.poll_ready(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(())) => {},
Poll::Ready(Ok(())) => {}
Poll::Ready(Err(_)) => return Poll::Ready(Err(MemoryTransportError::Unreachable)),
}

let channel_to_send = self.channel_to_send.take()
let channel_to_send = self
.channel_to_send
.take()
.expect("Future should not be polled again once complete");
let dial_port = self.dial_port;
match self.sender.start_send((channel_to_send, dial_port)) {
Err(_) => return Poll::Ready(Err(MemoryTransportError::Unreachable)),
Ok(()) => {}
}

Poll::Ready(Ok(self.channel_to_return.take()
.expect("Future should not be polled again once complete")))
Poll::Ready(Ok(self
.channel_to_return
.take()
.expect("Future should not be polled again once complete")))
}
}

Expand All @@ -172,7 +185,7 @@ impl Transport for MemoryTransport {
port,
addr: Protocol::Memory(port.get()).into(),
receiver: rx,
tell_listen_addr: true
tell_listen_addr: true,
};

Ok(listener)
Expand Down Expand Up @@ -222,16 +235,19 @@ pub struct Listener {
/// Receives incoming connections.
receiver: ChannelReceiver,
/// Generate `ListenerEvent::NewAddress` to inform about our listen address.
tell_listen_addr: bool
tell_listen_addr: bool,
}

impl Stream for Listener {
type Item = Result<ListenerEvent<Ready<Result<Channel<Vec<u8>>, MemoryTransportError>>, MemoryTransportError>, MemoryTransportError>;
type Item = Result<
ListenerEvent<Ready<Result<Channel<Vec<u8>>, MemoryTransportError>>, MemoryTransportError>,
MemoryTransportError,
>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.tell_listen_addr {
self.tell_listen_addr = false;
return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(self.addr.clone()))))
return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(self.addr.clone()))));
}

let (channel, dial_port) = match Stream::poll_next(Pin::new(&mut self.receiver), cx) {
Expand All @@ -243,7 +259,7 @@ impl Stream for Listener {
let event = ListenerEvent::Upgrade {
upgrade: future::ready(Ok(channel)),
local_addr: self.addr.clone(),
remote_addr: Protocol::Memory(dial_port.get()).into()
remote_addr: Protocol::Memory(dial_port.get()).into(),
};

Poll::Ready(Some(Ok(event)))
Expand Down Expand Up @@ -295,8 +311,7 @@ pub struct Chan<T = Vec<u8>> {
dial_port: Option<NonZeroU64>,
}

impl<T> Unpin for Chan<T> {
}
impl<T> Unpin for Chan<T> {}

impl<T> Stream for Chan<T> {
type Item = Result<T, io::Error>;
Expand All @@ -314,12 +329,15 @@ impl<T> Sink<T> for Chan<T> {
type Error = io::Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.outgoing.poll_ready(cx)
self.outgoing
.poll_ready(cx)
.map(|v| v.map_err(|_| io::ErrorKind::BrokenPipe.into()))
}

fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
self.outgoing.start_send(item).map_err(|_| io::ErrorKind::BrokenPipe.into())
self.outgoing
.start_send(item)
.map_err(|_| io::ErrorKind::BrokenPipe.into())
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand Down Expand Up @@ -355,30 +373,59 @@ mod tests {
assert_eq!(parse_memory_addr(&"/memory/5".parse().unwrap()), Ok(5));
assert_eq!(parse_memory_addr(&"/tcp/150".parse().unwrap()), Err(()));
assert_eq!(parse_memory_addr(&"/memory/0".parse().unwrap()), Ok(0));
assert_eq!(parse_memory_addr(&"/memory/5/tcp/150".parse().unwrap()), Err(()));
assert_eq!(parse_memory_addr(&"/tcp/150/memory/5".parse().unwrap()), Err(()));
assert_eq!(parse_memory_addr(&"/memory/1234567890".parse().unwrap()), Ok(1_234_567_890));
assert_eq!(
parse_memory_addr(&"/memory/5/tcp/150".parse().unwrap()),
Err(())
);
assert_eq!(
parse_memory_addr(&"/tcp/150/memory/5".parse().unwrap()),
Err(())
);
assert_eq!(
parse_memory_addr(&"/memory/1234567890".parse().unwrap()),
Ok(1_234_567_890)
);
}

#[test]
fn listening_twice() {
let transport = MemoryTransport::default();
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok());
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok());
let _listener = transport.listen_on("/memory/1639174018481".parse().unwrap()).unwrap();
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_err());
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_err());
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_ok());
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_ok());
let _listener = transport
.listen_on("/memory/1639174018481".parse().unwrap())
.unwrap();
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_err());
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_err());
drop(_listener);
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok());
assert!(transport.listen_on("/memory/1639174018481".parse().unwrap()).is_ok());
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_ok());
assert!(transport
.listen_on("/memory/1639174018481".parse().unwrap())
.is_ok());
}

#[test]
fn port_not_in_use() {
let transport = MemoryTransport::default();
assert!(transport.dial("/memory/810172461024613".parse().unwrap()).is_err());
let _listener = transport.listen_on("/memory/810172461024613".parse().unwrap()).unwrap();
assert!(transport.dial("/memory/810172461024613".parse().unwrap()).is_ok());
assert!(transport
.dial("/memory/810172461024613".parse().unwrap())
.is_err());
let _listener = transport
.listen_on("/memory/810172461024613".parse().unwrap())
.unwrap();
assert!(transport
.dial("/memory/810172461024613".parse().unwrap())
.is_ok());
}

#[test]
Expand All @@ -396,9 +443,11 @@ mod tests {
let listener = async move {
let listener = t1.listen_on(t1_addr.clone()).unwrap();

let upgrade = listener.filter_map(|ev| futures::future::ready(
ListenerEvent::into_upgrade(ev.unwrap())
)).next().await.unwrap();
let upgrade = listener
.filter_map(|ev| futures::future::ready(ListenerEvent::into_upgrade(ev.unwrap())))
.next()
.await
.unwrap();

let mut socket = upgrade.0.await.unwrap();

Expand All @@ -423,16 +472,14 @@ mod tests {

#[test]
fn dialer_address_unequal_to_listener_address() {
let listener_addr: Multiaddr = Protocol::Memory(
rand::random::<u64>().saturating_add(1),
).into();
let listener_addr: Multiaddr =
Protocol::Memory(rand::random::<u64>().saturating_add(1)).into();
let listener_addr_cloned = listener_addr.clone();

let listener_transport = MemoryTransport::default();

let listener = async move {
let mut listener = listener_transport.listen_on(listener_addr.clone())
.unwrap();
let mut listener = listener_transport.listen_on(listener_addr.clone()).unwrap();
while let Some(ev) = listener.next().await {
if let ListenerEvent::Upgrade { remote_addr, .. } = ev.unwrap() {
assert!(
Expand All @@ -445,7 +492,8 @@ mod tests {
};

let dialer = async move {
MemoryTransport::default().dial(listener_addr_cloned)
MemoryTransport::default()
.dial(listener_addr_cloned)
.unwrap()
.await
.unwrap();
Expand All @@ -459,21 +507,18 @@ mod tests {
let (terminate, should_terminate) = futures::channel::oneshot::channel();
let (terminated, is_terminated) = futures::channel::oneshot::channel();

let listener_addr: Multiaddr = Protocol::Memory(
rand::random::<u64>().saturating_add(1),
).into();
let listener_addr: Multiaddr =
Protocol::Memory(rand::random::<u64>().saturating_add(1)).into();
let listener_addr_cloned = listener_addr.clone();

let listener_transport = MemoryTransport::default();

let listener = async move {
let mut listener = listener_transport.listen_on(listener_addr.clone())
.unwrap();
let mut listener = listener_transport.listen_on(listener_addr.clone()).unwrap();
while let Some(ev) = listener.next().await {
if let ListenerEvent::Upgrade { remote_addr, .. } = ev.unwrap() {
let dialer_port = NonZeroU64::new(
parse_memory_addr(&remote_addr).unwrap(),
).unwrap();
let dialer_port =
NonZeroU64::new(parse_memory_addr(&remote_addr).unwrap()).unwrap();

assert!(
HUB.get(&dialer_port).is_some(),
Expand All @@ -494,7 +539,8 @@ mod tests {
};

let dialer = async move {
let _chan = MemoryTransport::default().dial(listener_addr_cloned)
let _chan = MemoryTransport::default()
.dial(listener_addr_cloned)
.unwrap()
.await
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion examples/gossipsub-chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fn main() -> Result<(), Box<dyn Error>> {
};

// Set a custom gossipsub
let gossipsub_config = gossipsub::GossipsubConfigBuilder::new()
let gossipsub_config = gossipsub::GossipsubConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space
.validation_mode(ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ fn main() -> Result<(), Box<dyn Error>> {

// Create a Swarm to manage peers and events
let mut swarm = {
let gossipsub_config = GossipsubConfigBuilder::new()
let gossipsub_config = GossipsubConfigBuilder::default()
.max_transmit_size(262144)
.build()
.expect("valid config");
Expand Down
2 changes: 2 additions & 0 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ async-std = "1.6.3"
env_logger = "0.8.1"
libp2p-plaintext = { path = "../plaintext" }
libp2p-yamux = { path = "../../muxers/yamux" }
libp2p-mplex = { path = "../../muxers/mplex" }
libp2p-noise = { path = "../../protocols/noise" }
quickcheck = "0.9.2"
hex = "0.4.2"
derive_builder = "0.9.0"
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl BackoffStorage {
}
}

/// Updates the backoff for a peer (if there is already a more restrictive backoff than this call
/// Updates the backoff for a peer (if there is already a more restrictive backoff then this call
/// doesn't change anything).
pub fn update_backoff(&mut self, topic: &TopicHash, peer: &PeerId, time: Duration) {
let instant = Instant::now() + time;
Expand Down
Loading