Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dp/chore/test-cor…
Browse files Browse the repository at this point in the history
…e-handled_node_tasks

* upstream/master:
  Tweaks, spelling and grammar (libp2p#629)
  Add a badge with a link to deps.rs (libp2p#630)
  Rewrite floodsub to use the ProtocolsHandler (libp2p#603)
  Add an IdentifyListen behaviour (libp2p#626)
  Add a custom derive for NetworkBehaviour (libp2p#619)
  Set the maximum size of Mplex messages to 1Mb (libp2p#622)
  Use expect rather than unwrap (libp2p#625)
  Make libp2p-websocket optional (libp2p#624)
  Add From<IpAddr> for Multiaddr (libp2p#623)
  Add implementations of NetworkBehaviour for ping (libp2p#618)
  Add a PeriodicIdentifyBehaviour (libp2p#617)
  Use upstream rust-secp256k1 (libp2p#616)
  Use yamux and aio-limited from crates.io (libp2p#621)
  • Loading branch information
dvdplm committed Nov 14, 2018
2 parents 99f4620 + 9b47dd5 commit 5d3698b
Show file tree
Hide file tree
Showing 30 changed files with 2,080 additions and 688 deletions.
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"

[features]
default = ["secio-rsa", "secio-secp256k1"]
default = ["secio-rsa", "secio-secp256k1", "libp2p-websocket"]
secio-rsa = ["libp2p-secio/rsa"]
secio-secp256k1 = ["libp2p-secio/secp256k1"]

Expand All @@ -23,10 +23,11 @@ libp2p-ping = { path = "./protocols/ping" }
libp2p-ratelimit = { path = "./transports/ratelimit" }
libp2p-relay = { path = "./transports/relay" }
libp2p-core = { path = "./core" }
libp2p-core-derive = { path = "./misc/core-derive" }
libp2p-secio = { path = "./protocols/secio", default-features = false }
libp2p-transport-timeout = { path = "./transports/timeout" }
libp2p-uds = { path = "./transports/uds" }
libp2p-websocket = { path = "./transports/websocket" }
libp2p-websocket = { path = "./transports/websocket", optional = true }
libp2p-yamux = { path = "./muxers/yamux" }
tokio-codec = "0.1"
tokio-executor = "0.1"
Expand All @@ -51,6 +52,7 @@ tokio-stdin = "0.1"
[workspace]
members = [
"core",
"misc/core-derive",
"misc/multiaddr",
"misc/multihash",
"misc/multistream-select",
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Central repository for work on libp2p

[![dependency status](https://deps.rs/repo/github/libp2p/rust-libp2p/status.svg)](https://deps.rs/repo/github/libp2p/rust-libp2p)

This repository is the central place for Rust development of the [libp2p](https://libp2p.io) spec.

**This readme will be more fleshed out the closer the project gets to completion.
Expand Down
168 changes: 154 additions & 14 deletions core/src/nodes/protocols_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use either::EitherOutput;
use futures::prelude::*;
use nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent};
use std::{io, marker::PhantomData, time::Duration};
Expand All @@ -29,28 +30,28 @@ use {ConnectionUpgrade, Endpoint};

/// Handler for a set of protocols for a specific connection with a remote.
///
/// This trait should be implemented on struct that hold the state for a specific protocol
/// This trait should be implemented on a struct that holds the state for a specific protocol
/// behaviour with a specific remote.
///
/// # Handling a protocol
///
/// Protocols with the remote can be opened in two different ways:
/// Communication with a remote over a set of protocols opened in two different ways:
///
/// - Dialing, which is a voluntary process. In order to do so, make `poll()` return an
/// `OutboundSubstreamRequest` variant containing the connection upgrade to use.
/// `OutboundSubstreamRequest` variant containing the connection upgrade to use to start using a protocol.
/// - Listening, which is used to determine which protocols are supported when the remote wants
/// to open a substream. The `listen_protocol()` method should return the upgrades supported when
/// listening.
///
/// The upgrade when dialing and the upgrade when listening have to be of the same type, but you
/// are free to return for example an `OrUpgrade` enum, or an enum of yours, containing the upgrade
/// are free to return for example an `OrUpgrade` enum, or an enum of your own, containing the upgrade
/// you want depending on the situation.
///
/// # Shutting down
///
/// Implementors of this trait should keep in mind that the connection can be closed at any time.
/// When a connection is closed (either by us or by the remote) `shutdown()` is called and the
/// handler continues to be processed until it produces `None`. Then only the handler is destroyed.
/// handler continues to be processed until it produces `None`. Only then the handler is destroyed.
///
/// This makes it possible for the handler to finish delivering events even after knowing that it
/// is shutting down.
Expand All @@ -64,7 +65,7 @@ use {ConnectionUpgrade, Endpoint};
/// This trait is very similar to the `NodeHandler` trait. The fundamental differences are:
///
/// - The `NodeHandler` trait gives you more control and is therefore more difficult to implement.
/// - The `NodeHandler` trait is designed to have exclusive ownership of the connection with a
/// - The `NodeHandler` trait is designed to have exclusive ownership of the connection to a
/// node, while the `ProtocolsHandler` trait is designed to handle only a specific set of
/// protocols. Two or more implementations of `ProtocolsHandler` can be combined into one that
/// supports all the protocols together, which is not possible with `NodeHandler`.
Expand Down Expand Up @@ -108,11 +109,11 @@ pub trait ProtocolsHandler {
/// Indicates to the handler that upgrading a substream to the given protocol has failed.
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: io::Error);

/// Indicates the handler that the inbound part of the muxer has been closed, and that
/// therefore no more inbound substream will be produced.
/// Indicates to the handler that the inbound part of the muxer has been closed, and that
/// therefore no more inbound substreams will be produced.
fn inject_inbound_closed(&mut self);

/// Indicates the node that it should shut down. After that, it is expected that `poll()`
/// Indicates to the node that it should shut down. After that, it is expected that `poll()`
/// returns `Ready(None)` as soon as possible.
///
/// This method allows an implementation to perform a graceful shutdown of the substreams, and
Expand Down Expand Up @@ -156,6 +157,19 @@ pub trait ProtocolsHandler {
MapOutEvent { inner: self, map }
}

/// Builds an implementation of `ProtocolsHandler` that handles both this protocol and the
/// other one together.
#[inline]
fn select<TProto2>(self, other: TProto2) -> ProtocolsHandlerSelect<Self, TProto2>
where
Self: Sized,
{
ProtocolsHandlerSelect {
proto1: self,
proto2: other,
}
}

/// Creates a builder that will allow creating a `NodeHandler` that handles this protocol
/// exclusively.
#[inline]
Expand Down Expand Up @@ -185,11 +199,11 @@ pub trait ProtocolsHandler {
/// Event produced by a handler.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum ProtocolsHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom> {
/// Require a new outbound substream to be opened with the remote.
/// Request a new outbound substream to be opened with the remote.
OutboundSubstreamRequest {
/// The upgrade to apply on the substream.
upgrade: TConnectionUpgrade,
/// User-defind information, passed back when the substream is open.
/// User-defined information, passed back when the substream is open.
info: TOutboundOpenInfo,
},

Expand All @@ -201,7 +215,7 @@ pub enum ProtocolsHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom> {
impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
ProtocolsHandlerEvent<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
{
/// If this is `OutboundSubstreamRequest`, maps the content to something else.
/// If this is an `OutboundSubstreamRequest`, maps the `info` member from a `TOutboundOpenInfo` to something else.
#[inline]
pub fn map_outbound_open_info<F, I>(
self,
Expand All @@ -221,7 +235,7 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
}
}

/// If this is `OutboundSubstreamRequest`, maps the protocol to another.
/// If this is an `OutboundSubstreamRequest`, maps the protocol (`TConnectionUpgrade`) to something else.
#[inline]
pub fn map_protocol<F, I>(
self,
Expand All @@ -241,7 +255,7 @@ impl<TConnectionUpgrade, TOutboundOpenInfo, TCustom>
}
}

/// If this is `Custom`, maps the content to something else.
/// If this is a `Custom` event, maps the content to something else.
#[inline]
pub fn map_custom<F, I>(
self,
Expand Down Expand Up @@ -680,3 +694,129 @@ where
Ok(Async::NotReady)
}
}

/// Implementation of `ProtocolsHandler` that combines two protocols into one.
#[derive(Debug, Clone)]
pub struct ProtocolsHandlerSelect<TProto1, TProto2> {
proto1: TProto1,
proto2: TProto2,
}

impl<TSubstream, TProto1, TProto2, TProto1Out, TProto2Out>
ProtocolsHandler for ProtocolsHandlerSelect<TProto1, TProto2>
where TProto1: ProtocolsHandler<Substream = TSubstream>,
TProto2: ProtocolsHandler<Substream = TSubstream>,
TSubstream: AsyncRead + AsyncWrite,
TProto1::Protocol: ConnectionUpgrade<TSubstream, Output = TProto1Out>,
TProto2::Protocol: ConnectionUpgrade<TSubstream, Output = TProto2Out>,
{
type InEvent = EitherOutput<TProto1::InEvent, TProto2::InEvent>;
type OutEvent = EitherOutput<TProto1::OutEvent, TProto2::OutEvent>;
type Substream = TSubstream;
type Protocol = upgrade::OrUpgrade<upgrade::toggleable::Toggleable<upgrade::map::Map<TProto1::Protocol, fn(TProto1Out) -> EitherOutput<TProto1Out, TProto2Out>>>, upgrade::toggleable::Toggleable<upgrade::map::Map<TProto2::Protocol, fn(TProto2Out) -> EitherOutput<TProto1Out, TProto2Out>>>>;
type OutboundOpenInfo = EitherOutput<TProto1::OutboundOpenInfo, TProto2::OutboundOpenInfo>;

#[inline]
fn listen_protocol(&self) -> Self::Protocol {
let proto1 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(self.proto1.listen_protocol(), EitherOutput::First));
let proto2 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(self.proto2.listen_protocol(), EitherOutput::Second));
upgrade::or(proto1, proto2)
}

fn inject_fully_negotiated(&mut self, protocol: <Self::Protocol as ConnectionUpgrade<TSubstream>>::Output, endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>) {
match (protocol, endpoint) {
(EitherOutput::First(protocol), NodeHandlerEndpoint::Dialer(EitherOutput::First(info))) => {
self.proto1.inject_fully_negotiated(protocol, NodeHandlerEndpoint::Dialer(info));
},
(EitherOutput::Second(protocol), NodeHandlerEndpoint::Dialer(EitherOutput::Second(info))) => {
self.proto2.inject_fully_negotiated(protocol, NodeHandlerEndpoint::Dialer(info));
},
(EitherOutput::First(_), NodeHandlerEndpoint::Dialer(EitherOutput::Second(_))) => {
panic!("wrong API usage: the protocol doesn't match the upgrade info")
},
(EitherOutput::Second(_), NodeHandlerEndpoint::Dialer(EitherOutput::First(_))) => {
panic!("wrong API usage: the protocol doesn't match the upgrade info")
},
(EitherOutput::First(protocol), NodeHandlerEndpoint::Listener) => {
self.proto1.inject_fully_negotiated(protocol, NodeHandlerEndpoint::Listener);
},
(EitherOutput::Second(protocol), NodeHandlerEndpoint::Listener) => {
self.proto2.inject_fully_negotiated(protocol, NodeHandlerEndpoint::Listener);
},
}
}

#[inline]
fn inject_event(&mut self, event: Self::InEvent) {
match event {
EitherOutput::First(event) => self.proto1.inject_event(event),
EitherOutput::Second(event) => self.proto2.inject_event(event),
}
}

#[inline]
fn inject_inbound_closed(&mut self) {
self.proto1.inject_inbound_closed();
self.proto2.inject_inbound_closed();
}

#[inline]
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: io::Error) {
match info {
EitherOutput::First(info) => self.proto1.inject_dial_upgrade_error(info, error),
EitherOutput::Second(info) => self.proto2.inject_dial_upgrade_error(info, error),
}
}

#[inline]
fn shutdown(&mut self) {
self.proto1.shutdown();
self.proto2.shutdown();
}

fn poll(&mut self) -> Poll<Option<ProtocolsHandlerEvent<Self::Protocol, Self::OutboundOpenInfo, Self::OutEvent>>, io::Error> {
match self.proto1.poll()? {
Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::First(event)))));
},
Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info})) => {
let upgrade = {
let proto1 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(upgrade, EitherOutput::First));
let mut proto2 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(self.proto2.listen_protocol(), EitherOutput::Second));
proto2.disable();
upgrade::or(proto1, proto2)
};

return Ok(Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade,
info: EitherOutput::First(info),
})));
},
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => ()
};

match self.proto2.poll()? {
Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event)))));
},
Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info })) => {
let upgrade = {
let mut proto1 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(self.proto1.listen_protocol(), EitherOutput::First));
proto1.disable();
let proto2 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(upgrade, EitherOutput::Second));
upgrade::or(proto1, proto2)
};

return Ok(Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade,
info: EitherOutput::Second(info),
})));
},
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => ()
};

Ok(Async::NotReady)
}
}
34 changes: 25 additions & 9 deletions core/src/upgrade/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::io::Error as IoError;
use futures::prelude::*;
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
Expand All @@ -39,9 +38,8 @@ pub struct Map<U, F> {
impl<C, U, F, O> ConnectionUpgrade<C> for Map<U, F>
where
U: ConnectionUpgrade<C>,
U::Future: Send + 'static, // TODO: 'static :(
C: AsyncRead + AsyncWrite,
F: FnOnce(U::Output) -> O + Send + 'static, // TODO: 'static :(
F: FnOnce(U::Output) -> O,
{
type NamesIter = U::NamesIter;
type UpgradeIdentifier = U::UpgradeIdentifier;
Expand All @@ -51,18 +49,36 @@ where
}

type Output = O;
type Future = Box<Future<Item = O, Error = IoError> + Send>;
type Future = MapFuture<U::Future, F>;

fn upgrade(
self,
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
) -> Self::Future {
let map = self.map;
let fut = self.upgrade
.upgrade(socket, id, ty)
.map(map);
Box::new(fut) as Box<_>
MapFuture {
inner: self.upgrade.upgrade(socket, id, ty),
map: Some(self.map),
}
}
}

pub struct MapFuture<TInnerFut, TMap> {
inner: TInnerFut,
map: Option<TMap>,
}

impl<TInnerFut, TIn, TMap, TOut> Future for MapFuture<TInnerFut, TMap>
where TInnerFut: Future<Item = TIn>,
TMap: FnOnce(TIn) -> TOut,
{
type Item = TOut;
type Error = TInnerFut::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let item = try_ready!(self.inner.poll());
let map = self.map.take().expect("Future has already finished");
Ok(Async::Ready(map(item)))
}
}
15 changes: 15 additions & 0 deletions misc/core-derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "libp2p-core-derive"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"

[lib]
proc-macro = true

[dependencies]
syn = { version = "0.15", default-features = false, features = ["derive", "parsing", "printing", "proc-macro"] }
quote = "0.6"

[dev-dependencies]
libp2p = { path = "../.." }
Loading

0 comments on commit 5d3698b

Please sign in to comment.