Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tcp refinements #404

Merged
merged 10 commits into from
Aug 9, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 137 additions & 61 deletions tcp-transport/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,17 @@ extern crate tokio_tcp;
#[cfg(test)]
extern crate tokio_current_thread;

use futures::Poll;
use futures::future::{self, Future, FutureResult};
use futures::stream::Stream;
use futures::{future, future::FutureResult, prelude::*, Async, Poll};
use multiaddr::{AddrComponent, Multiaddr, ToMultiaddr};
use std::fmt;
use std::io::{Error as IoError, Read, Write};
use std::iter;
use std::net::SocketAddr;
use std::time::Duration;
use swarm::Transport;
use tk_listen::ListenExt;
use tokio_tcp::{TcpListener, TcpStream};
use tk_listen::{ListenExt, SleepOnError};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_tcp::{ConnectFuture, Incoming, TcpListener, TcpStream};

/// Represents the configuration for a TCP/IP transport capability for libp2p.
///
Expand All @@ -87,10 +86,10 @@ impl TcpConfig {

impl Transport for TcpConfig {
type Output = TcpTransStream;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type Listener = TcpListenStream;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
type MultiaddrFuture = FutureResult<Multiaddr, IoError>;
type Dial = Box<Future<Item = (TcpTransStream, Self::MultiaddrFuture), Error = IoError>>;
type Dial = TcpDialFut;

fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
Expand All @@ -110,27 +109,11 @@ impl Transport for TcpConfig {
};

debug!("Now listening on {}", new_addr);

let sleep_on_error = self.sleep_on_error;
let future = future::result(listener)
.map(move |listener| {
// Pull out a stream of sockets for incoming connections
listener.incoming()
.sleep_on_error(sleep_on_error)
.map_err(|()| unreachable!("sleep_on_error cannot err"))
.map(|sock| {
let addr = match sock.peer_addr() {
Ok(addr) => addr.to_multiaddr()
.expect("generating a multiaddr from a socket addr never fails"),
Err(err) => return future::err(err),
};

debug!("Incoming connection from {}", addr);
future::ok((TcpTransStream { inner: sock }, future::ok(addr)))
})
})
.flatten_stream();
Ok((Box::new(future), new_addr))
let inner = listener
.map_err(Some)
.map(move |l| l.incoming().sleep_on_error(sleep_on_error));
Ok((TcpListenStream { inner }, new_addr))
} else {
Err((self, addr))
}
Expand All @@ -142,15 +125,10 @@ impl Transport for TcpConfig {
// If so, we instantly refuse dialing instead of going through the kernel.
if socket_addr.port() != 0 && !socket_addr.ip().is_unspecified() {
debug!("Dialing {}", addr);
let fut = TcpStream::connect(&socket_addr)
.map(|t| {
(TcpTransStream { inner: t }, future::ok(addr))
})
.map_err(move |err| {
debug!("Error while dialing {:?} => {:?}", socket_addr, err);
err
});
Ok(Box::new(fut) as Box<_>)
Ok(TcpDialFut {
inner: TcpStream::connect(&socket_addr),
addr: Some(addr),
})
} else {
debug!("Instantly refusing dialing {}, as it is invalid", addr);
Err((self, addr))
Expand All @@ -161,57 +139,152 @@ impl Transport for TcpConfig {
}

fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
let server_protocols: Vec<_> = server.iter().collect();
let observed_protocols: Vec<_> = observed.iter().collect();
// Check that `server` only has two components and retreive them.
let mut server_protocols_iter = server.iter();
let server_proto1 = server_protocols_iter.next()?;
let server_proto2 = server_protocols_iter.next()?;
if server_protocols_iter.next().is_some() {
return None;
}

if server_protocols.len() != 2 || observed_protocols.len() != 2 {
// Check that `observed` only has two components and retreive them.
let mut observed_protocols_iter = observed.iter();
let observed_proto1 = observed_protocols_iter.next()?;
let observed_proto2 = observed_protocols_iter.next()?;
if observed_protocols_iter.next().is_some() {
return None;
}

// Check that `server` is a valid TCP/IP address.
match (&server_protocols[0], &server_protocols[1]) {
match (&server_proto1, &server_proto2) {
(&AddrComponent::IP4(_), &AddrComponent::TCP(_))
| (&AddrComponent::IP6(_), &AddrComponent::TCP(_)) => {}
_ => return None,
}

// Check that `observed` is a valid TCP/IP address.
match (&observed_protocols[0], &observed_protocols[1]) {
match (&observed_proto1, &observed_proto2) {
(&AddrComponent::IP4(_), &AddrComponent::TCP(_))
| (&AddrComponent::IP6(_), &AddrComponent::TCP(_)) => {}
_ => return None,
}

let result = iter::once(observed_protocols[0].clone())
.chain(iter::once(server_protocols[1].clone()))
let result = iter::once(observed_proto1.clone())
.chain(iter::once(server_proto2.clone()))
.collect();

Some(result)
}
}

// This type of logic should probably be moved into the multiaddr package
fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
let protocols: Vec<_> = addr.iter().collect();
let mut iter = addr.iter();
let proto1 = iter.next().ok_or(())?;
let proto2 = iter.next().ok_or(())?;

if protocols.len() != 2 {
if iter.next().is_some() {
return Err(());
}

match (&protocols[0], &protocols[1]) {
(&AddrComponent::IP4(ref ip), &AddrComponent::TCP(port)) => {
Ok(SocketAddr::new(ip.clone().into(), port))
match (proto1, proto2) {
(AddrComponent::IP4(ip), AddrComponent::TCP(port)) => Ok(SocketAddr::new(ip.into(), port)),
(AddrComponent::IP6(ip), AddrComponent::TCP(port)) => Ok(SocketAddr::new(ip.into(), port)),
_ => Err(()),
}
}

/// Future that dials a TCP/IP address.
#[derive(Debug)]
pub struct TcpDialFut {
inner: ConnectFuture,
/// Address we're dialing. Extracted when the `Future` finishes.
addr: Option<Multiaddr>,
}

impl Future for TcpDialFut {
type Item = (TcpTransStream, FutureResult<Multiaddr, IoError>);
type Error = IoError;

fn poll(&mut self) -> Poll<(TcpTransStream, FutureResult<Multiaddr, IoError>), IoError> {
match self.inner.poll() {
Ok(Async::Ready(stream)) => {
let addr = self
.addr
.take()
.expect("TcpDialFut polled again after finished");
let out = TcpTransStream { inner: stream };
Ok(Async::Ready((out, future::ok(addr))))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => {
let addr = self
.addr
.as_ref()
.expect("TcpDialFut polled again after finished");
debug!("Error while dialing {:?} => {:?}", addr, err);
Err(err)
}
}
(&AddrComponent::IP6(ref ip), &AddrComponent::TCP(port)) => {
Ok(SocketAddr::new(ip.clone().into(), port))
}
}

/// Stream that listens on an TCP/IP address.
pub struct TcpListenStream {
inner: Result<SleepOnError<Incoming>, Option<IoError>>,
}

impl Stream for TcpListenStream {
type Item = FutureResult<(TcpTransStream, FutureResult<Multiaddr, IoError>), IoError>;
type Error = IoError;

fn poll(
&mut self,
) -> Poll<
Option<FutureResult<(TcpTransStream, FutureResult<Multiaddr, IoError>), IoError>>,
IoError,
> {
let inner = match self.inner {
Ok(ref mut inc) => inc,
Err(ref mut err) => {
return Err(err.take().expect("poll called again after error"));
}
};

match inner.poll() {
Ok(Async::Ready(Some(sock))) => {
let addr = match sock.peer_addr() {
// TODO: remove this expect()
Ok(addr) => addr
.to_multiaddr()
.expect("generating a multiaddr from a socket addr never fails"),
Err(err) => return Ok(Async::Ready(Some(future::err(err)))),
};

debug!("Incoming connection from {}", addr);
let ret = future::ok((TcpTransStream { inner: sock }, future::ok(addr)));
Ok(Async::Ready(Some(ret)))
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(()) => unreachable!("sleep_on_error never produces an error"),
}
}
}

impl fmt::Debug for TcpListenStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.inner {
Ok(_) => write!(f, "TcpListenStream"),
Err(None) => write!(f, "TcpListenStream(Errored)"),
Err(Some(ref err)) => write!(f, "TcpListenStream({:?})", err),
}
_ => Err(()),
}
}

/// Wraps around a `TcpStream` and adds logging for important events.
#[derive(Debug)]
pub struct TcpTransStream {
inner: TcpStream
inner: TcpStream,
}

impl Read for TcpTransStream {
Expand All @@ -221,8 +294,7 @@ impl Read for TcpTransStream {
}
}

impl AsyncRead for TcpTransStream {
}
impl AsyncRead for TcpTransStream {}

impl Write for TcpTransStream {
#[inline]
Expand Down Expand Up @@ -283,9 +355,11 @@ mod tests {
))
);
assert_eq!(
multiaddr_to_socketaddr(&"/ip4/255.255.255.255/tcp/8080"
.parse::<Multiaddr>()
.unwrap()),
multiaddr_to_socketaddr(
&"/ip4/255.255.255.255/tcp/8080"
.parse::<Multiaddr>()
.unwrap()
),
Ok(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
8080,
Expand All @@ -299,9 +373,11 @@ mod tests {
))
);
assert_eq!(
multiaddr_to_socketaddr(&"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
.parse::<Multiaddr>()
.unwrap()),
multiaddr_to_socketaddr(
&"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
.parse::<Multiaddr>()
.unwrap()
),
Ok(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(
65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
Expand Down