Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistream-select] Reduce roundtrips in protocol negotiation. #1212

Merged
merged 16 commits into from
Aug 12, 2019
Merged
63 changes: 35 additions & 28 deletions core/src/upgrade/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
// DEALINGS IN THE SOFTWARE.

use crate::ConnectedPoint;
use crate::upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, UpgradeError, ProtocolName};
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
use crate::upgrade::{ProtocolName, NegotiatedComplete};
use futures::{future::Either, prelude::*};
use log::debug;
use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture};
use std::mem;
use std::{iter, mem};
use tokio_io::{AsyncRead, AsyncWrite};

/// Applies an upgrade to the inbound and outbound direction of a connection or substream.
Expand All @@ -46,10 +47,10 @@ where
C: AsyncRead + AsyncWrite,
U: InboundUpgrade<C>,
{
let iter = UpgradeInfoIterWrap(up);
let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>);
let future = multistream_select::listener_select_proto(conn, iter);
InboundUpgradeApply {
inner: InboundUpgradeApplyState::Init { future }
inner: InboundUpgradeApplyState::Init { future, upgrade: up }
}
}

Expand Down Expand Up @@ -78,10 +79,11 @@ where
enum InboundUpgradeApplyState<C, U>
where
C: AsyncRead + AsyncWrite,
U: InboundUpgrade<C>
U: InboundUpgrade<C>,
{
Init {
future: ListenerSelectFuture<C, UpgradeInfoIterWrap<U>, NameWrap<U::Info>>,
future: ListenerSelectFuture<C, NameWrap<U::Info>>,
upgrade: U,
},
Upgrade {
future: U::Future
Expand All @@ -100,16 +102,16 @@ where
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match mem::replace(&mut self.inner, InboundUpgradeApplyState::Undefined) {
InboundUpgradeApplyState::Init { mut future } => {
let (info, connection, upgrade) = match future.poll()? {
InboundUpgradeApplyState::Init { mut future, upgrade } => {
let (info, io) = match future.poll()? {
Async::Ready(x) => x,
Async::NotReady => {
self.inner = InboundUpgradeApplyState::Init { future };
self.inner = InboundUpgradeApplyState::Init { future, upgrade };
return Ok(Async::NotReady)
}
};
self.inner = InboundUpgradeApplyState::Upgrade {
future: upgrade.0.upgrade_inbound(connection, info.0)
future: upgrade.upgrade_inbound(io, info.0)
};
}
InboundUpgradeApplyState::Upgrade { mut future } => {
Expand Down Expand Up @@ -153,6 +155,11 @@ where
future: DialerSelectFuture<C, NameWrapIter<<U::InfoIter as IntoIterator>::IntoIter>>,
upgrade: U
},
AwaitNegotiated {
io: NegotiatedComplete<C>,
upgrade: U,
protocol: U::Info
},
Upgrade {
future: U::Future
},
Expand All @@ -178,8 +185,24 @@ where
return Ok(Async::NotReady)
}
};
self.inner = OutboundUpgradeApplyState::AwaitNegotiated {
io: connection.complete(),
protocol: info.0,
upgrade
};
}
OutboundUpgradeApplyState::AwaitNegotiated { mut io, protocol, upgrade } => {
let io = match io.poll()? {
Async::NotReady => {
self.inner = OutboundUpgradeApplyState::AwaitNegotiated {
io, protocol, upgrade
};
return Ok(Async::NotReady)
}
Async::Ready(io) => io
};
self.inner = OutboundUpgradeApplyState::Upgrade {
future: upgrade.upgrade_outbound(connection, info.0)
future: upgrade.upgrade_outbound(io, protocol)
};
}
OutboundUpgradeApplyState::Upgrade { mut future } => {
Expand All @@ -205,23 +228,7 @@ where
}
}

/// Wraps around a `UpgradeInfo` and satisfies the requirement of `listener_select_proto`.
struct UpgradeInfoIterWrap<U>(U);

impl<'a, U> IntoIterator for &'a UpgradeInfoIterWrap<U>
where
U: UpgradeInfo
{
type Item = NameWrap<U::Info>;
type IntoIter = NameWrapIter<<U::InfoIter as IntoIterator>::IntoIter>;

fn into_iter(self) -> Self::IntoIter {
self.0.protocol_info().into_iter().map(NameWrap)
}
}

type NameWrapIter<I> =
std::iter::Map<I, fn(<I as Iterator>::Item) -> NameWrap<<I as Iterator>::Item>>;
type NameWrapIter<I> = iter::Map<I, fn(<I as Iterator>::Item) -> NameWrap<<I as Iterator>::Item>>;

/// Wrapper type to expose an `AsRef<[u8]>` impl for all types implementing `ProtocolName`.
#[derive(Clone)]
Expand Down
8 changes: 4 additions & 4 deletions core/src/upgrade/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use multistream_select::ProtocolChoiceError;
use multistream_select::NegotiationError;
use std::fmt;

/// Error that can happen when upgrading a connection or substream to use a protocol.
#[derive(Debug)]
pub enum UpgradeError<E> {
/// Error during the negotiation process.
Select(ProtocolChoiceError),
Select(NegotiationError),
/// Error during the post-negotiation handshake.
Apply(E),
}
Expand Down Expand Up @@ -73,8 +73,8 @@ where
}
}

impl<E> From<ProtocolChoiceError> for UpgradeError<E> {
fn from(e: ProtocolChoiceError) -> Self {
impl<E> From<NegotiationError> for UpgradeError<E> {
fn from(e: NegotiationError) -> Self {
UpgradeError::Select(e)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/upgrade/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ mod transfer;

use futures::future::Future;

pub use multistream_select::Negotiated;
pub use multistream_select::{Negotiated, NegotiatedComplete, NegotiationError, ProtocolError};
pub use self::{
apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply},
denied::DeniedUpgrade,
Expand Down
13 changes: 12 additions & 1 deletion core/tests/network_dial_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod util;

use futures::{future, prelude::*};
use libp2p_core::identity;
use libp2p_core::multiaddr::multiaddr;
Expand Down Expand Up @@ -167,6 +169,7 @@ fn deny_incoming_connec() {

#[test]
fn dial_self() {

// Check whether dialing ourselves correctly fails.
//
// Dialing the same address we're listening should result in three events:
Expand All @@ -191,7 +194,13 @@ fn dial_self() {
.map_outbound(move |muxer| (peer_id, muxer))
.map_inbound(move |muxer| (peer_id2, muxer));
upgrade::apply(out.stream, upgrade, endpoint)
})
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
});

Network::new(transport, local_public_key.into())
};

Expand Down Expand Up @@ -243,7 +252,9 @@ fn dial_self() {
assert_eq!(*inc.listen_addr(), address);
inc.accept(TestHandler::default().into_node_handler_builder());
},
Async::Ready(ev) => unreachable!("{:?}", ev),
Async::Ready(ev) => {
panic!("Unexpected event: {:?}", ev)
}
Async::NotReady => break Ok(Async::NotReady),
}
}
Expand Down
75 changes: 58 additions & 17 deletions core/tests/network_simult.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod util;

use futures::{future, prelude::*};
use libp2p_core::identity;
use libp2p_core::nodes::network::{Network, NetworkEvent, IncomingError};
use libp2p_core::nodes::{Network, NetworkEvent, Peer};
use libp2p_core::nodes::network::IncomingError;
use libp2p_core::{Transport, upgrade, upgrade::OutboundUpgradeExt, upgrade::InboundUpgradeExt};
use libp2p_swarm::{
ProtocolsHandler,
Expand Down Expand Up @@ -118,6 +121,11 @@ fn raw_swarm_simultaneous_connect() {
.map_outbound(move |muxer| (peer_id, muxer))
.map_inbound(move |muxer| (peer_id2, muxer));
upgrade::apply(out.stream, upgrade, endpoint)
})
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
});
Network::new(transport, local_public_key.into_peer_id())
};
Expand All @@ -134,6 +142,11 @@ fn raw_swarm_simultaneous_connect() {
.map_outbound(move |muxer| (peer_id, muxer))
.map_inbound(move |muxer| (peer_id2, muxer));
upgrade::apply(out.stream, upgrade, endpoint)
})
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
});
Network::new(transport, local_public_key.into_peer_id())
};
Expand Down Expand Up @@ -164,14 +177,14 @@ fn raw_swarm_simultaneous_connect() {

let mut reactor = tokio::runtime::current_thread::Runtime::new().unwrap();

for _ in 0 .. 10 {
loop {
let mut swarm1_step = 0;
let mut swarm2_step = 0;

let mut swarm1_dial_start = Delay::new(Instant::now() + Duration::new(0, rand::random::<u32>() % 50_000_000));
let mut swarm2_dial_start = Delay::new(Instant::now() + Duration::new(0, rand::random::<u32>() % 50_000_000));

let future = future::poll_fn(|| -> Poll<(), io::Error> {
let future = future::poll_fn(|| -> Poll<bool, io::Error> {
loop {
let mut swarm1_not_ready = false;
let mut swarm2_not_ready = false;
Expand All @@ -183,10 +196,11 @@ fn raw_swarm_simultaneous_connect() {
match swarm1_dial_start.poll().unwrap() {
Async::Ready(_) => {
let handler = TestHandler::default().into_node_handler_builder();
swarm1.peer(swarm2.local_peer_id().clone()).into_not_connected().unwrap()
swarm1.peer(swarm2.local_peer_id().clone())
.into_not_connected()
.unwrap()
.connect(swarm2_listen_addr.clone(), handler);
swarm1_step = 1;
swarm1_not_ready = false;
},
Async::NotReady => swarm1_not_ready = true,
}
Expand All @@ -196,23 +210,31 @@ fn raw_swarm_simultaneous_connect() {
match swarm2_dial_start.poll().unwrap() {
Async::Ready(_) => {
let handler = TestHandler::default().into_node_handler_builder();
swarm2.peer(swarm1.local_peer_id().clone()).into_not_connected().unwrap()
swarm2.peer(swarm1.local_peer_id().clone())
.into_not_connected()
.unwrap()
.connect(swarm1_listen_addr.clone(), handler);
swarm2_step = 1;
swarm2_not_ready = false;
},
Async::NotReady => swarm2_not_ready = true,
}
}

if rand::random::<f32>() < 0.1 {
match swarm1.poll() {
Async::Ready(NetworkEvent::IncomingConnectionError { error: IncomingError::DeniedLowerPriority, .. }) => {
Async::Ready(NetworkEvent::IncomingConnectionError {
error: IncomingError::DeniedLowerPriority, ..
}) => {
assert_eq!(swarm1_step, 2);
swarm1_step = 3;
},
Async::Ready(NetworkEvent::Connected { conn_info, .. }) => {
assert_eq!(conn_info, *swarm2.local_peer_id());
if swarm1_step == 0 {
// The connection was established before
// swarm1 started dialing; discard the test run.
return Ok(Async::Ready(false))
}
assert_eq!(swarm1_step, 1);
swarm1_step = 2;
},
Expand All @@ -224,19 +246,26 @@ fn raw_swarm_simultaneous_connect() {
Async::Ready(NetworkEvent::IncomingConnection(inc)) => {
inc.accept(TestHandler::default().into_node_handler_builder());
},
Async::Ready(_) => unreachable!(),
Async::Ready(ev) => panic!("swarm1: unexpected event: {:?}", ev),
Async::NotReady => swarm1_not_ready = true,
}
}

if rand::random::<f32>() < 0.1 {
match swarm2.poll() {
Async::Ready(NetworkEvent::IncomingConnectionError { error: IncomingError::DeniedLowerPriority, .. }) => {
Async::Ready(NetworkEvent::IncomingConnectionError {
error: IncomingError::DeniedLowerPriority, ..
}) => {
assert_eq!(swarm2_step, 2);
swarm2_step = 3;
},
Async::Ready(NetworkEvent::Connected { conn_info, .. }) => {
assert_eq!(conn_info, *swarm1.local_peer_id());
if swarm2_step == 0 {
// The connection was established before
// swarm2 started dialing; discard the test run.
return Ok(Async::Ready(false))
}
assert_eq!(swarm2_step, 1);
swarm2_step = 2;
},
Expand All @@ -248,14 +277,14 @@ fn raw_swarm_simultaneous_connect() {
Async::Ready(NetworkEvent::IncomingConnection(inc)) => {
inc.accept(TestHandler::default().into_node_handler_builder());
},
Async::Ready(_) => unreachable!(),
Async::Ready(ev) => panic!("swarm2: unexpected event: {:?}", ev),
Async::NotReady => swarm2_not_ready = true,
}
}

// TODO: make sure that >= 5 is correct
if swarm1_step + swarm2_step >= 5 {
return Ok(Async::Ready(()));
return Ok(Async::Ready(true));
}

if swarm1_not_ready && swarm2_not_ready {
Expand All @@ -264,11 +293,23 @@ fn raw_swarm_simultaneous_connect() {
}
});

reactor.block_on(future).unwrap();

// We now disconnect them again.
swarm1.peer(swarm2.local_peer_id().clone()).into_connected().unwrap().close();
swarm2.peer(swarm1.local_peer_id().clone()).into_connected().unwrap().close();
if reactor.block_on(future).unwrap() {
// The test exercised what we wanted to exercise: a simultaneous connect.
break
} else {
// The test did not trigger a simultaneous connect; ensure the nodes
// are disconnected and re-run the test.
match swarm1.peer(swarm2.local_peer_id().clone()) {
Peer::Connected(p) => p.close(),
Peer::PendingConnect(p) => p.interrupt(),
x => panic!("Unexpected state for swarm1: {:?}", x)
}
match swarm2.peer(swarm1.local_peer_id().clone()) {
Peer::Connected(p) => p.close(),
Peer::PendingConnect(p) => p.interrupt(),
x => panic!("Unexpected state for swarm2: {:?}", x)
}
}
}
}
}
Loading