Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable multistream-select protocol. V1Lazy variant. #1245

Merged
merged 1 commit into from
Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,12 @@ pub trait Transport {
}

/// Begins a series of protocol upgrades via an [`upgrade::Builder`].
fn upgrade(self) -> upgrade::Builder<Self>
fn upgrade(self, version: upgrade::Version) -> upgrade::Builder<Self>
where
Self: Sized,
Self::Error: 'static
{
upgrade::Builder::new(self)
upgrade::Builder::new(self, version)
}
}

Expand Down
21 changes: 13 additions & 8 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

//! Configuration of transport protocol upgrades.

pub use crate::upgrade::Version;

use crate::{
ConnectedPoint,
ConnectionInfo,
Expand Down Expand Up @@ -68,7 +70,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
///
/// [`Network`]: crate::nodes::Network
pub struct Builder<T> {
inner: T
inner: T,
version: upgrade::Version,
}

impl<T> Builder<T>
Expand All @@ -77,8 +80,8 @@ where
T::Error: 'static,
{
/// Creates a `Builder` over the given (base) `Transport`.
pub fn new(transport: T) -> Builder<T> {
Builder { inner: transport }
pub fn new(inner: T, version: upgrade::Version) -> Builder<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so much of a fan of this. I think there should be a default version that the user can override.
However I see that the builder would have to be rewritten to account for this, so it's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a default version - Version even implements Default. Would you prefer having two different variants of Transport::upgrade, one without a Version parameter and one with such a parameter, whereby the former delegates to Builder::new(...) with the default version? If so, any name suggestions for the upgrade variant taking a version? upgrade_with? upgrade_using? upgrade_with_protocol? As you see my reason for not doing so was that I struggled with one of the hardest problems in computer science: naming things.

Builder { inner, version }
}

/// Upgrades the transport to perform authentication of the remote.
Expand All @@ -105,11 +108,12 @@ where
U: OutboundUpgrade<C, Output = (I, D), Error = E> + Clone,
E: Error + 'static,
{
let version = self.version;
Builder::new(self.inner.and_then(move |conn, endpoint| {
Authenticate {
inner: upgrade::apply(conn, upgrade, endpoint)
inner: upgrade::apply(conn, upgrade, endpoint, version)
}
}))
}), version)
}

/// Applies an arbitrary upgrade on an authenticated, non-multiplexed
Expand All @@ -133,7 +137,7 @@ where
U: OutboundUpgrade<C, Output = D, Error = E> + Clone,
E: Error + 'static,
{
Builder::new(Upgrade::new(self.inner, upgrade))
Builder::new(Upgrade::new(self.inner, upgrade), self.version)
}

/// Upgrades the transport with a (sub)stream multiplexer.
Expand All @@ -158,8 +162,9 @@ where
U: OutboundUpgrade<C, Output = M, Error = E> + Clone,
E: Error + 'static,
{
let version = self.version;
self.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, upgrade, endpoint);
let upgrade = upgrade::apply(c, upgrade, endpoint, version);
Multiplex { info: Some(i), upgrade }
})
}
Expand Down Expand Up @@ -332,7 +337,7 @@ where
future::Either::A(ref mut up) => {
let (i, c) = try_ready!(self.future.poll().map_err(TransportUpgradeError::Transport));
let u = up.take().expect("DialUpgradeFuture is constructed with Either::A(Some).");
future::Either::B((Some(i), apply_outbound(c, u)))
future::Either::B((Some(i), apply_outbound(c, u, upgrade::Version::V1)))
}
future::Either::B((ref mut i, ref mut up)) => {
let d = try_ready!(up.poll().map_err(TransportUpgradeError::Upgrade));
Expand Down
35 changes: 8 additions & 27 deletions core/src/upgrade/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@

use crate::ConnectedPoint;
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
use crate::upgrade::{ProtocolName, NegotiatedComplete};
use crate::upgrade::ProtocolName;
use futures::{future::Either, prelude::*};
use log::debug;
use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture};
use std::{iter, mem};
use tokio_io::{AsyncRead, AsyncWrite};

pub use multistream_select::Version;

/// Applies an upgrade to the inbound and outbound direction of a connection or substream.
pub fn apply<C, U>(conn: C, up: U, cp: ConnectedPoint)
pub fn apply<C, U>(conn: C, up: U, cp: ConnectedPoint, v: Version)
-> Either<InboundUpgradeApply<C, U>, OutboundUpgradeApply<C, U>>
where
C: AsyncRead + AsyncWrite,
Expand All @@ -37,7 +39,7 @@ where
if cp.is_listener() {
Either::A(apply_inbound(conn, up))
} else {
Either::B(apply_outbound(conn, up))
Either::B(apply_outbound(conn, up, v))
}
}

Expand All @@ -55,13 +57,13 @@ where
}

/// Tries to perform an upgrade on an outbound connection or substream.
pub fn apply_outbound<C, U>(conn: C, up: U) -> OutboundUpgradeApply<C, U>
pub fn apply_outbound<C, U>(conn: C, up: U, v: Version) -> OutboundUpgradeApply<C, U>
where
C: AsyncRead + AsyncWrite,
U: OutboundUpgrade<C>
{
let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>);
let future = multistream_select::dialer_select_proto(conn, iter);
let future = multistream_select::dialer_select_proto(conn, iter, v);
OutboundUpgradeApply {
inner: OutboundUpgradeApplyState::Init { future, upgrade: up }
}
Expand Down Expand Up @@ -155,11 +157,6 @@ where
future: DialerSelectFuture<C, NameWrapIter<<U::InfoIter as IntoIterator>::IntoIter>>,
upgrade: U
},
AwaitNegotiated {
io: NegotiatedComplete<C>,
upgrade: U,
protocol: U::Info
},
Upgrade {
future: U::Future
},
Expand All @@ -185,24 +182,8 @@ where
return Ok(Async::NotReady)
}
};
self.inner = OutboundUpgradeApplyState::AwaitNegotiated {
io: connection.complete(),
protocol: info.0,
upgrade
};
}
OutboundUpgradeApplyState::AwaitNegotiated { mut io, protocol, upgrade } => {
let io = match io.poll()? {
Async::NotReady => {
self.inner = OutboundUpgradeApplyState::AwaitNegotiated {
io, protocol, upgrade
};
return Ok(Async::NotReady)
}
Async::Ready(io) => io
};
self.inner = OutboundUpgradeApplyState::Upgrade {
future: upgrade.upgrade_outbound(io, protocol)
future: upgrade.upgrade_outbound(connection, info.0)
};
}
OutboundUpgradeApplyState::Upgrade { mut future } => {
Expand Down
2 changes: 1 addition & 1 deletion core/src/upgrade/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ mod transfer;

use futures::future::Future;

pub use multistream_select::{Negotiated, NegotiatedComplete, NegotiationError, ProtocolError};
pub use multistream_select::{Version, Negotiated, NegotiatedComplete, NegotiationError, ProtocolError};
pub use self::{
apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply},
denied::DeniedUpgrade,
Expand Down
10 changes: 5 additions & 5 deletions core/tests/network_dial_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn deny_incoming_connec() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into())
Expand All @@ -105,7 +105,7 @@ fn deny_incoming_connec() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into())
Expand Down Expand Up @@ -170,7 +170,7 @@ fn dial_self() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| {
Expand Down Expand Up @@ -249,7 +249,7 @@ fn dial_self_by_id() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into())
Expand All @@ -267,7 +267,7 @@ fn multiple_addresses_err() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into())
Expand Down
4 changes: 2 additions & 2 deletions core/tests/network_simult.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fn raw_swarm_simultaneous_connect() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| {
Expand All @@ -125,7 +125,7 @@ fn raw_swarm_simultaneous_connect() {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade()
.upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new())
.and_then(|(peer, mplex), _| {
Expand Down
6 changes: 3 additions & 3 deletions core/tests/transport_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::future::Future;
use futures::stream::Stream;
use libp2p_core::identity;
use libp2p_core::transport::{Transport, MemoryTransport, ListenerEvent};
use libp2p_core::upgrade::{UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade};
use libp2p_core::upgrade::{self, UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade};
use libp2p_mplex::MplexConfig;
use libp2p_secio::SecioConfig;
use multiaddr::Multiaddr;
Expand Down Expand Up @@ -78,7 +78,7 @@ fn upgrade_pipeline() {
let listener_keys = identity::Keypair::generate_ed25519();
let listener_id = listener_keys.public().into_peer_id();
let listener_transport = MemoryTransport::default()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(listener_keys))
.apply(HelloUpgrade {})
.apply(HelloUpgrade {})
Expand All @@ -93,7 +93,7 @@ fn upgrade_pipeline() {
let dialer_keys = identity::Keypair::generate_ed25519();
let dialer_id = dialer_keys.public().into_peer_id();
let dialer_transport = MemoryTransport::default()
.upgrade()
.upgrade(upgrade::Version::V1)
.authenticate(SecioConfig::new(dialer_keys))
.apply(HelloUpgrade {})
.apply(HelloUpgrade {})
Expand Down
Loading