Skip to content

Commit

Permalink
[multistream-select] Reduce roundtrips in protocol negotiation. (#1212)
Browse files Browse the repository at this point in the history
* Remove tokio-codec dependency from multistream-select.

In preparation for the eventual switch from tokio to std futures.

Includes some initial refactoring in preparation for further work
in the context of #659.

* Reduce default buffer sizes.

* Allow more than one frame to be buffered for sending.

* Doc tweaks.

* Remove superfluous (duplicated) Message types.

* Reduce roundtrips in multistream-select negotiation.

1. Enable 0-RTT: If the dialer only supports a single protocol, it can send
   protocol data (e.g. the actual application request) together with
   the multistream-select header and protocol proposal. Similarly,
   if the listener supports a proposed protocol, it can send protocol
   data (e.g. the actual application response) together with the
   multistream-select header and protocol confirmation.

2. In general, the dialer "settles on" an expected protocol as soon
   as it runs out of alternatives. Furthermore, both dialer and listener
   do not immediately flush the final protocol confirmation, allowing it
   to be sent together with application protocol data. Attempts to read
   from the negotiated I/O stream implicitly flushes any pending data.

3. A clean / graceful shutdown of an I/O stream always completes protocol
   negotiation.

The publich API of multistream-select changed slightly, requiring both
AsyncRead and AsyncWrite bounds for async reading and writing due to
the implicit buffering and "lazy" negotiation. The error types have
also been changed, but they were not previously fully exported.

Includes some general refactoring with simplifications and some more tests,
e.g. there was an edge case relating to a possible ambiguity when parsing
multistream-select protocol messages.

* Further missing commentary.

* Remove unused test dependency.

* Adjust commentary.

* Cleanup NegotiatedComplete::poll()

* Fix deflate protocol tests.

* Stabilise network_simult test.

The test implicitly relied on "slow" connection establishment
in order to have a sufficient probability of passing.
With the removal of roundtrips in multistream-select, it is now
more likely that within the up to 50ms duration between swarm1
and swarm2 dialing, the connection is already established, causing
the expectation of step == 1 to fail when receiving a Connected event,
since the step may then still be 0.

This commit aims to avoid these spurious errors by detecting runs
during which a connection is established "too quickly", repeating
the test run.

It still seems theoretically possible that, if connections are always
established "too quickly", the test runs forever. However, given that
the delta between swarm1 and swarm2 dialing is 0-50ms and that the
TCP transport is used, that seems probabilistically unlikely.
Nevertheless, the purpose of the artificial dialing delay between
swarm1 and swarm2 should be re-evaluated and possibly at least
the maximum delay further reduced.

* Complete negotiation between upgrades in libp2p-core.

While multistream-select, as a standalone library and providing
an API at the granularity of a single negotiation, supports
lazy negotiation (and in particular 0-RTT negotiation), in the
context of libp2p-core where any number of negotiations are
composed generically within the concept of composable "upgrades",
it is necessary to wait for protocol negotiation between upgrades
to complete.

* Clarify docs. Simplify listener upgrades.

Since reading from a Negotiated I/O stream implicitly flushes any pending
negotiation data, there is no pitfall involved in not waiting for completion.
  • Loading branch information
romanb authored Aug 12, 2019
1 parent 5696b3e commit 589d280
Show file tree
Hide file tree
Showing 23 changed files with 1,595 additions and 1,332 deletions.
63 changes: 35 additions & 28 deletions core/src/upgrade/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
// DEALINGS IN THE SOFTWARE.

use crate::ConnectedPoint;
use crate::upgrade::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, UpgradeError, ProtocolName};
use crate::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeError};
use crate::upgrade::{ProtocolName, NegotiatedComplete};
use futures::{future::Either, prelude::*};
use log::debug;
use multistream_select::{self, DialerSelectFuture, ListenerSelectFuture};
use std::mem;
use std::{iter, mem};
use tokio_io::{AsyncRead, AsyncWrite};

/// Applies an upgrade to the inbound and outbound direction of a connection or substream.
Expand All @@ -46,10 +47,10 @@ where
C: AsyncRead + AsyncWrite,
U: InboundUpgrade<C>,
{
let iter = UpgradeInfoIterWrap(up);
let iter = up.protocol_info().into_iter().map(NameWrap as fn(_) -> NameWrap<_>);
let future = multistream_select::listener_select_proto(conn, iter);
InboundUpgradeApply {
inner: InboundUpgradeApplyState::Init { future }
inner: InboundUpgradeApplyState::Init { future, upgrade: up }
}
}

Expand Down Expand Up @@ -78,10 +79,11 @@ where
enum InboundUpgradeApplyState<C, U>
where
C: AsyncRead + AsyncWrite,
U: InboundUpgrade<C>
U: InboundUpgrade<C>,
{
Init {
future: ListenerSelectFuture<C, UpgradeInfoIterWrap<U>, NameWrap<U::Info>>,
future: ListenerSelectFuture<C, NameWrap<U::Info>>,
upgrade: U,
},
Upgrade {
future: U::Future
Expand All @@ -100,16 +102,16 @@ where
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match mem::replace(&mut self.inner, InboundUpgradeApplyState::Undefined) {
InboundUpgradeApplyState::Init { mut future } => {
let (info, connection, upgrade) = match future.poll()? {
InboundUpgradeApplyState::Init { mut future, upgrade } => {
let (info, io) = match future.poll()? {
Async::Ready(x) => x,
Async::NotReady => {
self.inner = InboundUpgradeApplyState::Init { future };
self.inner = InboundUpgradeApplyState::Init { future, upgrade };
return Ok(Async::NotReady)
}
};
self.inner = InboundUpgradeApplyState::Upgrade {
future: upgrade.0.upgrade_inbound(connection, info.0)
future: upgrade.upgrade_inbound(io, info.0)
};
}
InboundUpgradeApplyState::Upgrade { mut future } => {
Expand Down Expand Up @@ -153,6 +155,11 @@ where
future: DialerSelectFuture<C, NameWrapIter<<U::InfoIter as IntoIterator>::IntoIter>>,
upgrade: U
},
AwaitNegotiated {
io: NegotiatedComplete<C>,
upgrade: U,
protocol: U::Info
},
Upgrade {
future: U::Future
},
Expand All @@ -178,8 +185,24 @@ where
return Ok(Async::NotReady)
}
};
self.inner = OutboundUpgradeApplyState::AwaitNegotiated {
io: connection.complete(),
protocol: info.0,
upgrade
};
}
OutboundUpgradeApplyState::AwaitNegotiated { mut io, protocol, upgrade } => {
let io = match io.poll()? {
Async::NotReady => {
self.inner = OutboundUpgradeApplyState::AwaitNegotiated {
io, protocol, upgrade
};
return Ok(Async::NotReady)
}
Async::Ready(io) => io
};
self.inner = OutboundUpgradeApplyState::Upgrade {
future: upgrade.upgrade_outbound(connection, info.0)
future: upgrade.upgrade_outbound(io, protocol)
};
}
OutboundUpgradeApplyState::Upgrade { mut future } => {
Expand All @@ -205,23 +228,7 @@ where
}
}

/// Wraps around a `UpgradeInfo` and satisfies the requirement of `listener_select_proto`.
struct UpgradeInfoIterWrap<U>(U);

impl<'a, U> IntoIterator for &'a UpgradeInfoIterWrap<U>
where
U: UpgradeInfo
{
type Item = NameWrap<U::Info>;
type IntoIter = NameWrapIter<<U::InfoIter as IntoIterator>::IntoIter>;

fn into_iter(self) -> Self::IntoIter {
self.0.protocol_info().into_iter().map(NameWrap)
}
}

type NameWrapIter<I> =
std::iter::Map<I, fn(<I as Iterator>::Item) -> NameWrap<<I as Iterator>::Item>>;
type NameWrapIter<I> = iter::Map<I, fn(<I as Iterator>::Item) -> NameWrap<<I as Iterator>::Item>>;

/// Wrapper type to expose an `AsRef<[u8]>` impl for all types implementing `ProtocolName`.
#[derive(Clone)]
Expand Down
8 changes: 4 additions & 4 deletions core/src/upgrade/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use multistream_select::ProtocolChoiceError;
use multistream_select::NegotiationError;
use std::fmt;

/// Error that can happen when upgrading a connection or substream to use a protocol.
#[derive(Debug)]
pub enum UpgradeError<E> {
/// Error during the negotiation process.
Select(ProtocolChoiceError),
Select(NegotiationError),
/// Error during the post-negotiation handshake.
Apply(E),
}
Expand Down Expand Up @@ -73,8 +73,8 @@ where
}
}

impl<E> From<ProtocolChoiceError> for UpgradeError<E> {
fn from(e: ProtocolChoiceError) -> Self {
impl<E> From<NegotiationError> for UpgradeError<E> {
fn from(e: NegotiationError) -> Self {
UpgradeError::Select(e)
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/upgrade/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ mod transfer;

use futures::future::Future;

pub use multistream_select::Negotiated;
pub use multistream_select::{Negotiated, NegotiatedComplete, NegotiationError, ProtocolError};
pub use self::{
apply::{apply, apply_inbound, apply_outbound, InboundUpgradeApply, OutboundUpgradeApply},
denied::DeniedUpgrade,
Expand Down
13 changes: 12 additions & 1 deletion core/tests/network_dial_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod util;

use futures::{future, prelude::*};
use libp2p_core::identity;
use libp2p_core::multiaddr::multiaddr;
Expand Down Expand Up @@ -167,6 +169,7 @@ fn deny_incoming_connec() {

#[test]
fn dial_self() {

// Check whether dialing ourselves correctly fails.
//
// Dialing the same address we're listening should result in three events:
Expand All @@ -191,7 +194,13 @@ fn dial_self() {
.map_outbound(move |muxer| (peer_id, muxer))
.map_inbound(move |muxer| (peer_id2, muxer));
upgrade::apply(out.stream, upgrade, endpoint)
})
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
});

Network::new(transport, local_public_key.into())
};

Expand Down Expand Up @@ -243,7 +252,9 @@ fn dial_self() {
assert_eq!(*inc.listen_addr(), address);
inc.accept(TestHandler::default().into_node_handler_builder());
},
Async::Ready(ev) => unreachable!("{:?}", ev),
Async::Ready(ev) => {
panic!("Unexpected event: {:?}", ev)
}
Async::NotReady => break Ok(Async::NotReady),
}
}
Expand Down
75 changes: 58 additions & 17 deletions core/tests/network_simult.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

mod util;

use futures::{future, prelude::*};
use libp2p_core::identity;
use libp2p_core::nodes::network::{Network, NetworkEvent, IncomingError};
use libp2p_core::nodes::{Network, NetworkEvent, Peer};
use libp2p_core::nodes::network::IncomingError;
use libp2p_core::{Transport, upgrade, upgrade::OutboundUpgradeExt, upgrade::InboundUpgradeExt};
use libp2p_swarm::{
ProtocolsHandler,
Expand Down Expand Up @@ -118,6 +121,11 @@ fn raw_swarm_simultaneous_connect() {
.map_outbound(move |muxer| (peer_id, muxer))
.map_inbound(move |muxer| (peer_id2, muxer));
upgrade::apply(out.stream, upgrade, endpoint)
})
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
});
Network::new(transport, local_public_key.into_peer_id())
};
Expand All @@ -134,6 +142,11 @@ fn raw_swarm_simultaneous_connect() {
.map_outbound(move |muxer| (peer_id, muxer))
.map_inbound(move |muxer| (peer_id2, muxer));
upgrade::apply(out.stream, upgrade, endpoint)
})
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
});
Network::new(transport, local_public_key.into_peer_id())
};
Expand Down Expand Up @@ -164,14 +177,14 @@ fn raw_swarm_simultaneous_connect() {

let mut reactor = tokio::runtime::current_thread::Runtime::new().unwrap();

for _ in 0 .. 10 {
loop {
let mut swarm1_step = 0;
let mut swarm2_step = 0;

let mut swarm1_dial_start = Delay::new(Instant::now() + Duration::new(0, rand::random::<u32>() % 50_000_000));
let mut swarm2_dial_start = Delay::new(Instant::now() + Duration::new(0, rand::random::<u32>() % 50_000_000));

let future = future::poll_fn(|| -> Poll<(), io::Error> {
let future = future::poll_fn(|| -> Poll<bool, io::Error> {
loop {
let mut swarm1_not_ready = false;
let mut swarm2_not_ready = false;
Expand All @@ -183,10 +196,11 @@ fn raw_swarm_simultaneous_connect() {
match swarm1_dial_start.poll().unwrap() {
Async::Ready(_) => {
let handler = TestHandler::default().into_node_handler_builder();
swarm1.peer(swarm2.local_peer_id().clone()).into_not_connected().unwrap()
swarm1.peer(swarm2.local_peer_id().clone())
.into_not_connected()
.unwrap()
.connect(swarm2_listen_addr.clone(), handler);
swarm1_step = 1;
swarm1_not_ready = false;
},
Async::NotReady => swarm1_not_ready = true,
}
Expand All @@ -196,23 +210,31 @@ fn raw_swarm_simultaneous_connect() {
match swarm2_dial_start.poll().unwrap() {
Async::Ready(_) => {
let handler = TestHandler::default().into_node_handler_builder();
swarm2.peer(swarm1.local_peer_id().clone()).into_not_connected().unwrap()
swarm2.peer(swarm1.local_peer_id().clone())
.into_not_connected()
.unwrap()
.connect(swarm1_listen_addr.clone(), handler);
swarm2_step = 1;
swarm2_not_ready = false;
},
Async::NotReady => swarm2_not_ready = true,
}
}

if rand::random::<f32>() < 0.1 {
match swarm1.poll() {
Async::Ready(NetworkEvent::IncomingConnectionError { error: IncomingError::DeniedLowerPriority, .. }) => {
Async::Ready(NetworkEvent::IncomingConnectionError {
error: IncomingError::DeniedLowerPriority, ..
}) => {
assert_eq!(swarm1_step, 2);
swarm1_step = 3;
},
Async::Ready(NetworkEvent::Connected { conn_info, .. }) => {
assert_eq!(conn_info, *swarm2.local_peer_id());
if swarm1_step == 0 {
// The connection was established before
// swarm1 started dialing; discard the test run.
return Ok(Async::Ready(false))
}
assert_eq!(swarm1_step, 1);
swarm1_step = 2;
},
Expand All @@ -224,19 +246,26 @@ fn raw_swarm_simultaneous_connect() {
Async::Ready(NetworkEvent::IncomingConnection(inc)) => {
inc.accept(TestHandler::default().into_node_handler_builder());
},
Async::Ready(_) => unreachable!(),
Async::Ready(ev) => panic!("swarm1: unexpected event: {:?}", ev),
Async::NotReady => swarm1_not_ready = true,
}
}

if rand::random::<f32>() < 0.1 {
match swarm2.poll() {
Async::Ready(NetworkEvent::IncomingConnectionError { error: IncomingError::DeniedLowerPriority, .. }) => {
Async::Ready(NetworkEvent::IncomingConnectionError {
error: IncomingError::DeniedLowerPriority, ..
}) => {
assert_eq!(swarm2_step, 2);
swarm2_step = 3;
},
Async::Ready(NetworkEvent::Connected { conn_info, .. }) => {
assert_eq!(conn_info, *swarm1.local_peer_id());
if swarm2_step == 0 {
// The connection was established before
// swarm2 started dialing; discard the test run.
return Ok(Async::Ready(false))
}
assert_eq!(swarm2_step, 1);
swarm2_step = 2;
},
Expand All @@ -248,14 +277,14 @@ fn raw_swarm_simultaneous_connect() {
Async::Ready(NetworkEvent::IncomingConnection(inc)) => {
inc.accept(TestHandler::default().into_node_handler_builder());
},
Async::Ready(_) => unreachable!(),
Async::Ready(ev) => panic!("swarm2: unexpected event: {:?}", ev),
Async::NotReady => swarm2_not_ready = true,
}
}

// TODO: make sure that >= 5 is correct
if swarm1_step + swarm2_step >= 5 {
return Ok(Async::Ready(()));
return Ok(Async::Ready(true));
}

if swarm1_not_ready && swarm2_not_ready {
Expand All @@ -264,11 +293,23 @@ fn raw_swarm_simultaneous_connect() {
}
});

reactor.block_on(future).unwrap();

// We now disconnect them again.
swarm1.peer(swarm2.local_peer_id().clone()).into_connected().unwrap().close();
swarm2.peer(swarm1.local_peer_id().clone()).into_connected().unwrap().close();
if reactor.block_on(future).unwrap() {
// The test exercised what we wanted to exercise: a simultaneous connect.
break
} else {
// The test did not trigger a simultaneous connect; ensure the nodes
// are disconnected and re-run the test.
match swarm1.peer(swarm2.local_peer_id().clone()) {
Peer::Connected(p) => p.close(),
Peer::PendingConnect(p) => p.interrupt(),
x => panic!("Unexpected state for swarm1: {:?}", x)
}
match swarm2.peer(swarm1.local_peer_id().clone()) {
Peer::Connected(p) => p.close(),
Peer::PendingConnect(p) => p.interrupt(),
x => panic!("Unexpected state for swarm2: {:?}", x)
}
}
}
}
}
Loading

0 comments on commit 589d280

Please sign in to comment.