Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update root crate to use futures-0.3. #1315

Merged
merged 1 commit into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ secp256k1 = ["libp2p-core/secp256k1", "libp2p-secio/secp256k1"]

[dependencies]
bytes = "0.4"
futures = "0.1"
futures = "0.3.1"
multiaddr = { package = "parity-multiaddr", version = "0.5.1", path = "misc/multiaddr" }
multihash = { package = "parity-multihash", version = "0.1.4", path = "misc/multihash" }
lazy_static = "1.2"
Expand All @@ -34,10 +34,7 @@ libp2p-wasm-ext = { version = "0.6.0", path = "transports/wasm-ext" }
libp2p-yamux = { version = "0.13.0", path = "muxers/yamux" }
parking_lot = "0.9.0"
smallvec = "1.0"
tokio-codec = "0.1"
tokio-executor = "0.1"
tokio-io = "0.1"
wasm-timer = "0.1"
wasm-timer = "0.2.4"

[target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.5.0", path = "protocols/deflate" }
Expand Down
104 changes: 43 additions & 61 deletions src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
// DEALINGS IN THE SOFTWARE.

use crate::{Multiaddr, core::{Transport, transport::{ListenerEvent, TransportError}}};
use futures::{prelude::*, try_ready};
use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready};
use lazy_static::lazy_static;
use parking_lot::Mutex;
use smallvec::{smallvec, SmallVec};
use std::{cmp, io, io::Read, io::Write, sync::Arc, time::Duration};
use std::{cmp, io, pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration};
use wasm_timer::Instant;

/// Wraps around a `Transport` and logs the bandwidth that goes through all the opened connections.
Expand All @@ -35,7 +35,6 @@ pub struct BandwidthLogging<TInner> {

impl<TInner> BandwidthLogging<TInner> {
/// Creates a new `BandwidthLogging` around the transport.
#[inline]
pub fn new(inner: TInner, period: Duration) -> (Self, Arc<BandwidthSinks>) {
let mut period_seconds = cmp::min(period.as_secs(), 86400) as u32;
if period.subsec_nanos() > 0 {
Expand All @@ -58,7 +57,10 @@ impl<TInner> BandwidthLogging<TInner> {

impl<TInner> Transport for BandwidthLogging<TInner>
where
TInner: Transport,
TInner: Transport + Unpin,
TInner::Dial: Unpin,
TInner::Listener: Unpin,
TInner::ListenerUpgrade: Unpin
{
type Output = BandwidthConnecLogging<TInner::Output>;
type Error = TInner::Error;
Expand Down Expand Up @@ -90,22 +92,23 @@ pub struct BandwidthListener<TInner> {

impl<TInner, TConn> Stream for BandwidthListener<TInner>
where
TInner: Stream<Item = ListenerEvent<TConn>>,
TInner: TryStream<Ok = ListenerEvent<TConn>> + Unpin
{
type Item = ListenerEvent<BandwidthFuture<TConn>>;
type Error = TInner::Error;
type Item = Result<ListenerEvent<BandwidthFuture<TConn>>, TInner::Error>;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let event = match try_ready!(self.inner.poll()) {
Some(v) => v,
None => return Ok(Async::Ready(None))
};
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let event =
if let Some(event) = ready!(self.inner.try_poll_next_unpin(cx)?) {
event
} else {
return Poll::Ready(None)
};

let event = event.map(|inner| {
BandwidthFuture { inner, sinks: self.sinks.clone() }
});

Ok(Async::Ready(Some(event)))
Poll::Ready(Some(Ok(event)))
}
}

Expand All @@ -116,18 +119,13 @@ pub struct BandwidthFuture<TInner> {
sinks: Arc<BandwidthSinks>,
}

impl<TInner> Future for BandwidthFuture<TInner>
where TInner: Future,
{
type Item = BandwidthConnecLogging<TInner::Item>;
type Error = TInner::Error;
impl<TInner: TryFuture + Unpin> Future for BandwidthFuture<TInner> {
type Output = Result<BandwidthConnecLogging<TInner::Ok>, TInner::Error>;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = try_ready!(self.inner.poll());
Ok(Async::Ready(BandwidthConnecLogging {
inner,
sinks: self.sinks.clone(),
}))
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let inner = ready!(self.inner.try_poll_unpin(cx)?);
let logged = BandwidthConnecLogging { inner, sinks: self.sinks.clone() };
Poll::Ready(Ok(logged))
}
}

Expand All @@ -139,13 +137,11 @@ pub struct BandwidthSinks {

impl BandwidthSinks {
/// Returns the average number of bytes that have been downloaded in the period.
#[inline]
pub fn average_download_per_sec(&self) -> u64 {
self.download.lock().get()
}

/// Returns the average number of bytes that have been uploaded in the period.
#[inline]
pub fn average_upload_per_sec(&self) -> u64 {
self.upload.lock().get()
}
Expand All @@ -157,56 +153,43 @@ pub struct BandwidthConnecLogging<TInner> {
sinks: Arc<BandwidthSinks>,
}

impl<TInner> Read for BandwidthConnecLogging<TInner>
where TInner: Read
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let num_bytes = self.inner.read(buf)?;
impl<TInner: AsyncRead + Unpin> AsyncRead for BandwidthConnecLogging<TInner> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_read(cx, buf))?;
self.sinks.download.lock().inject(num_bytes);
Ok(num_bytes)
Poll::Ready(Ok(num_bytes))
}
}

impl<TInner> tokio_io::AsyncRead for BandwidthConnecLogging<TInner>
where TInner: tokio_io::AsyncRead
{
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
fn poll_read_vectored(mut self: Pin<&mut Self>, cx: &mut Context, bufs: &mut [IoSliceMut]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_read_vectored(cx, bufs))?;
self.sinks.download.lock().inject(num_bytes);
Poll::Ready(Ok(num_bytes))
}
}

fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.read_buf(buf)
impl<TInner: AsyncWrite + Unpin> AsyncWrite for BandwidthConnecLogging<TInner> {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_write(cx, buf))?;
self.sinks.upload.lock().inject(num_bytes);
Poll::Ready(Ok(num_bytes))
}
}

impl<TInner> Write for BandwidthConnecLogging<TInner>
where TInner: Write
{
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let num_bytes = self.inner.write(buf)?;
fn poll_write_vectored(mut self: Pin<&mut Self>, cx: &mut Context, bufs: &[IoSlice]) -> Poll<io::Result<usize>> {
let num_bytes = ready!(Pin::new(&mut self.inner).poll_write_vectored(cx, bufs))?;
self.sinks.upload.lock().inject(num_bytes);
Ok(num_bytes)
Poll::Ready(Ok(num_bytes))
}

#[inline]
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}
}

impl<TInner> tokio_io::AsyncWrite for BandwidthConnecLogging<TInner>
where TInner: tokio_io::AsyncWrite
{
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
self.inner.shutdown()
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_close(cx)
}
}

/// Returns the number of seconds that have elapsed between an arbitrary EPOCH and now.
#[inline]
fn current_second() -> u32 {
lazy_static! {
static ref EPOCH: Instant = Instant::now();
Expand Down Expand Up @@ -267,7 +250,6 @@ impl BandwidthSink {
self.bytes.remove(0);
self.bytes.push(0);
}

self.latest_update = current_second;
}
}
Expand Down
24 changes: 11 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ pub use futures;
pub use multiaddr;
#[doc(inline)]
pub use multihash;
pub use tokio_io;
pub use tokio_codec;

#[doc(inline)]
pub use libp2p_core as core;
Expand Down Expand Up @@ -229,7 +227,7 @@ use std::{error, io, time::Duration};
/// > **Note**: This `Transport` is not suitable for production usage, as its implementation
/// > reserves the right to support additional protocols or remove deprecated protocols.
pub fn build_development_transport(keypair: identity::Keypair)
-> impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone
-> io::Result<impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone>
{
build_tcp_ws_secio_mplex_yamux(keypair)
}
Expand All @@ -241,14 +239,14 @@ pub fn build_development_transport(keypair: identity::Keypair)
///
/// > **Note**: If you ever need to express the type of this `Transport`.
pub fn build_tcp_ws_secio_mplex_yamux(keypair: identity::Keypair)
-> impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone
-> io::Result<impl Transport<Output = (PeerId, impl core::muxing::StreamMuxer<OutboundSubstream = impl Send, Substream = impl Send, Error = impl Into<io::Error>> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone>
{
CommonTransport::new()
Ok(CommonTransport::new()?
.upgrade(core::upgrade::Version::V1)
.authenticate(secio::SecioConfig::new(keypair))
.multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()))
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
.timeout(Duration::from_secs(20))
.timeout(Duration::from_secs(20)))
}

/// Implementation of `Transport` that supports the most common protocols.
Expand Down Expand Up @@ -276,27 +274,27 @@ struct CommonTransportInner {
impl CommonTransport {
/// Initializes the `CommonTransport`.
#[cfg(not(any(target_os = "emscripten", target_os = "unknown")))]
pub fn new() -> CommonTransport {
pub fn new() -> io::Result<CommonTransport> {
let tcp = tcp::TcpConfig::new().nodelay(true);
let transport = dns::DnsConfig::new(tcp);
let transport = dns::DnsConfig::new(tcp)?;
#[cfg(feature = "libp2p-websocket")]
let transport = {
let trans_clone = transport.clone();
transport.or_transport(websocket::WsConfig::new(trans_clone))
};

CommonTransport {
Ok(CommonTransport {
inner: CommonTransportInner { inner: transport }
}
})
}

/// Initializes the `CommonTransport`.
#[cfg(any(target_os = "emscripten", target_os = "unknown"))]
pub fn new() -> CommonTransport {
pub fn new() -> io::Result<CommonTransport> {
let inner = core::transport::dummy::DummyTransport::new();
CommonTransport {
Ok(CommonTransport {
inner: CommonTransportInner { inner }
}
})
}
}

Expand Down
34 changes: 14 additions & 20 deletions src/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@

use crate::core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated};
use bytes::Bytes;
use futures::{future::FromErr, prelude::*};
use std::{iter, io::Error as IoError, sync::Arc};
use tokio_io::{AsyncRead, AsyncWrite};
use futures::prelude::*;
use std::{iter, sync::Arc};

/// Implementation of `ConnectionUpgrade`. Convenient to use with small protocols.
#[derive(Debug)]
Expand All @@ -35,7 +34,6 @@ pub struct SimpleProtocol<F> {

impl<F> SimpleProtocol<F> {
/// Builds a `SimpleProtocol`.
#[inline]
pub fn new<N>(info: N, upgrade: F) -> SimpleProtocol<F>
where
N: Into<Bytes>,
Expand All @@ -48,7 +46,6 @@ impl<F> SimpleProtocol<F> {
}

impl<F> Clone for SimpleProtocol<F> {
#[inline]
fn clone(&self) -> Self {
SimpleProtocol {
info: self.info.clone(),
Expand All @@ -61,42 +58,39 @@ impl<F> UpgradeInfo for SimpleProtocol<F> {
type Info = Bytes;
type InfoIter = iter::Once<Self::Info>;

#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(self.info.clone())
}
}

impl<C, F, O> InboundUpgrade<C> for SimpleProtocol<F>
impl<C, F, O, A, E> InboundUpgrade<C> for SimpleProtocol<F>
where
C: AsyncRead + AsyncWrite,
F: Fn(Negotiated<C>) -> O,
O: IntoFuture<Error = IoError>
O: Future<Output = Result<A, E>> + Unpin
{
type Output = O::Item;
type Error = IoError;
type Future = FromErr<O::Future, IoError>;
type Output = A;
type Error = E;
type Future = O;

#[inline]
fn upgrade_inbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
let upgrade = &self.upgrade;
upgrade(socket).into_future().from_err()
upgrade(socket)
}
}

impl<C, F, O> OutboundUpgrade<C> for SimpleProtocol<F>
impl<C, F, O, A, E> OutboundUpgrade<C> for SimpleProtocol<F>
where
C: AsyncRead + AsyncWrite,
F: Fn(Negotiated<C>) -> O,
O: IntoFuture<Error = IoError>
O: Future<Output = Result<A, E>> + Unpin
{
type Output = O::Item;
type Error = IoError;
type Future = FromErr<O::Future, IoError>;
type Output = A;
type Error = E;
type Future = O;

#[inline]
fn upgrade_outbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
let upgrade = &self.upgrade;
upgrade(socket).into_future().from_err()
upgrade(socket)
}
}
Loading