Skip to content

Commit

Permalink
Add a custom derive for NetworkBehaviour (#619)
Browse files Browse the repository at this point in the history
* Add ProtocolsHandlerSelect

* Add a custom derive for NetworkBehaviour

* Remove 2018 edition

* More work

* Update the tests and work

* Allow ignored fields

* More fixes

* Give access to everything in the poll method
  • Loading branch information
tomaka authored Nov 12, 2018
1 parent 7268a5a commit 623728b
Show file tree
Hide file tree
Showing 7 changed files with 700 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ libp2p-ping = { path = "./protocols/ping" }
libp2p-ratelimit = { path = "./transports/ratelimit" }
libp2p-relay = { path = "./transports/relay" }
libp2p-core = { path = "./core" }
libp2p-core-derive = { path = "./misc/core-derive" }
libp2p-secio = { path = "./protocols/secio", default-features = false }
libp2p-transport-timeout = { path = "./transports/timeout" }
libp2p-uds = { path = "./transports/uds" }
Expand Down Expand Up @@ -51,6 +52,7 @@ tokio-stdin = "0.1"
[workspace]
members = [
"core",
"misc/core-derive",
"misc/multiaddr",
"misc/multihash",
"misc/multistream-select",
Expand Down
140 changes: 140 additions & 0 deletions core/src/nodes/protocols_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use either::EitherOutput;
use futures::prelude::*;
use nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent};
use std::{io, marker::PhantomData, time::Duration};
Expand Down Expand Up @@ -156,6 +157,19 @@ pub trait ProtocolsHandler {
MapOutEvent { inner: self, map }
}

/// Builds an implementation of `ProtocolsHandler` that handles both this protocol and the
/// other one together.
#[inline]
fn select<TProto2>(self, other: TProto2) -> ProtocolsHandlerSelect<Self, TProto2>
where
Self: Sized,
{
ProtocolsHandlerSelect {
proto1: self,
proto2: other,
}
}

/// Creates a builder that will allow creating a `NodeHandler` that handles this protocol
/// exclusively.
#[inline]
Expand Down Expand Up @@ -680,3 +694,129 @@ where
Ok(Async::NotReady)
}
}

/// Implementation of `ProtocolsHandler` that combines two protocols into one.
#[derive(Debug, Clone)]
pub struct ProtocolsHandlerSelect<TProto1, TProto2> {
proto1: TProto1,
proto2: TProto2,
}

impl<TSubstream, TProto1, TProto2, TProto1Out, TProto2Out>
ProtocolsHandler for ProtocolsHandlerSelect<TProto1, TProto2>
where TProto1: ProtocolsHandler<Substream = TSubstream>,
TProto2: ProtocolsHandler<Substream = TSubstream>,
TSubstream: AsyncRead + AsyncWrite,
TProto1::Protocol: ConnectionUpgrade<TSubstream, Output = TProto1Out>,
TProto2::Protocol: ConnectionUpgrade<TSubstream, Output = TProto2Out>,
{
type InEvent = EitherOutput<TProto1::InEvent, TProto2::InEvent>;
type OutEvent = EitherOutput<TProto1::OutEvent, TProto2::OutEvent>;
type Substream = TSubstream;
type Protocol = upgrade::OrUpgrade<upgrade::toggleable::Toggleable<upgrade::map::Map<TProto1::Protocol, fn(TProto1Out) -> EitherOutput<TProto1Out, TProto2Out>>>, upgrade::toggleable::Toggleable<upgrade::map::Map<TProto2::Protocol, fn(TProto2Out) -> EitherOutput<TProto1Out, TProto2Out>>>>;
type OutboundOpenInfo = EitherOutput<TProto1::OutboundOpenInfo, TProto2::OutboundOpenInfo>;

#[inline]
fn listen_protocol(&self) -> Self::Protocol {
let proto1 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(self.proto1.listen_protocol(), EitherOutput::First));
let proto2 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(self.proto2.listen_protocol(), EitherOutput::Second));
upgrade::or(proto1, proto2)
}

fn inject_fully_negotiated(&mut self, protocol: <Self::Protocol as ConnectionUpgrade<TSubstream>>::Output, endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>) {
match (protocol, endpoint) {
(EitherOutput::First(protocol), NodeHandlerEndpoint::Dialer(EitherOutput::First(info))) => {
self.proto1.inject_fully_negotiated(protocol, NodeHandlerEndpoint::Dialer(info));
},
(EitherOutput::Second(protocol), NodeHandlerEndpoint::Dialer(EitherOutput::Second(info))) => {
self.proto2.inject_fully_negotiated(protocol, NodeHandlerEndpoint::Dialer(info));
},
(EitherOutput::First(_), NodeHandlerEndpoint::Dialer(EitherOutput::Second(_))) => {
panic!("wrong API usage: the protocol doesn't match the upgrade info")
},
(EitherOutput::Second(_), NodeHandlerEndpoint::Dialer(EitherOutput::First(_))) => {
panic!("wrong API usage: the protocol doesn't match the upgrade info")
},
(EitherOutput::First(protocol), NodeHandlerEndpoint::Listener) => {
self.proto1.inject_fully_negotiated(protocol, NodeHandlerEndpoint::Listener);
},
(EitherOutput::Second(protocol), NodeHandlerEndpoint::Listener) => {
self.proto2.inject_fully_negotiated(protocol, NodeHandlerEndpoint::Listener);
},
}
}

#[inline]
fn inject_event(&mut self, event: Self::InEvent) {
match event {
EitherOutput::First(event) => self.proto1.inject_event(event),
EitherOutput::Second(event) => self.proto2.inject_event(event),
}
}

#[inline]
fn inject_inbound_closed(&mut self) {
self.proto1.inject_inbound_closed();
self.proto2.inject_inbound_closed();
}

#[inline]
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: io::Error) {
match info {
EitherOutput::First(info) => self.proto1.inject_dial_upgrade_error(info, error),
EitherOutput::Second(info) => self.proto2.inject_dial_upgrade_error(info, error),
}
}

#[inline]
fn shutdown(&mut self) {
self.proto1.shutdown();
self.proto2.shutdown();
}

fn poll(&mut self) -> Poll<Option<ProtocolsHandlerEvent<Self::Protocol, Self::OutboundOpenInfo, Self::OutEvent>>, io::Error> {
match self.proto1.poll()? {
Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::First(event)))));
},
Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info})) => {
let upgrade = {
let proto1 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(upgrade, EitherOutput::First));
let mut proto2 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(self.proto2.listen_protocol(), EitherOutput::Second));
proto2.disable();
upgrade::or(proto1, proto2)
};

return Ok(Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade,
info: EitherOutput::First(info),
})));
},
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => ()
};

match self.proto2.poll()? {
Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))) => {
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(EitherOutput::Second(event)))));
},
Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info })) => {
let upgrade = {
let mut proto1 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(self.proto1.listen_protocol(), EitherOutput::First));
proto1.disable();
let proto2 = upgrade::toggleable(upgrade::map::<_, fn(_) -> _>(upgrade, EitherOutput::Second));
upgrade::or(proto1, proto2)
};

return Ok(Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
upgrade,
info: EitherOutput::Second(info),
})));
},
Async::Ready(None) => return Ok(Async::Ready(None)),
Async::NotReady => ()
};

Ok(Async::NotReady)
}
}
34 changes: 25 additions & 9 deletions core/src/upgrade/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::io::Error as IoError;
use futures::prelude::*;
use tokio_io::{AsyncRead, AsyncWrite};
use upgrade::{ConnectionUpgrade, Endpoint};
Expand All @@ -39,9 +38,8 @@ pub struct Map<U, F> {
impl<C, U, F, O> ConnectionUpgrade<C> for Map<U, F>
where
U: ConnectionUpgrade<C>,
U::Future: Send + 'static, // TODO: 'static :(
C: AsyncRead + AsyncWrite,
F: FnOnce(U::Output) -> O + Send + 'static, // TODO: 'static :(
F: FnOnce(U::Output) -> O,
{
type NamesIter = U::NamesIter;
type UpgradeIdentifier = U::UpgradeIdentifier;
Expand All @@ -51,18 +49,36 @@ where
}

type Output = O;
type Future = Box<Future<Item = O, Error = IoError> + Send>;
type Future = MapFuture<U::Future, F>;

fn upgrade(
self,
socket: C,
id: Self::UpgradeIdentifier,
ty: Endpoint,
) -> Self::Future {
let map = self.map;
let fut = self.upgrade
.upgrade(socket, id, ty)
.map(map);
Box::new(fut) as Box<_>
MapFuture {
inner: self.upgrade.upgrade(socket, id, ty),
map: Some(self.map),
}
}
}

pub struct MapFuture<TInnerFut, TMap> {
inner: TInnerFut,
map: Option<TMap>,
}

impl<TInnerFut, TIn, TMap, TOut> Future for MapFuture<TInnerFut, TMap>
where TInnerFut: Future<Item = TIn>,
TMap: FnOnce(TIn) -> TOut,
{
type Item = TOut;
type Error = TInnerFut::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let item = try_ready!(self.inner.poll());
let map = self.map.take().expect("Future has already finished");
Ok(Async::Ready(map(item)))
}
}
15 changes: 15 additions & 0 deletions misc/core-derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "libp2p-core-derive"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"

[lib]
proc-macro = true

[dependencies]
syn = { version = "0.15", default-features = false, features = ["derive", "parsing", "printing", "proc-macro"] }
quote = "0.6"

[dev-dependencies]
libp2p = { path = "../.." }
Loading

0 comments on commit 623728b

Please sign in to comment.