Skip to content

Commit

Permalink
misc/multistream-select/: Remove parallel dialing optimization (#2934)
Browse files Browse the repository at this point in the history
This is to avoid the usage of the now optional `ls` command, and
stay compatible with go-multistream.

Closes #2925
  • Loading branch information
dignifiedquire authored Sep 29, 2022
1 parent 749ff00 commit c71115d
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 298 deletions.
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ libsecp256k1 = { version = "0.7.0", optional = true }
log = "0.4"
multiaddr = { version = "0.14.0" }
multihash = { version = "0.16", default-features = false, features = ["std", "multihash-impl", "identity", "sha2"] }
multistream-select = { version = "0.11", path = "../misc/multistream-select" }
multistream-select = { version = "0.12", path = "../misc/multistream-select" }
p256 = { version = "0.11.1", default-features = false, features = ["ecdsa"], optional = true }
parking_lot = "0.12.0"
pin-project = "1.0.0"
Expand Down
6 changes: 6 additions & 0 deletions misc/multistream-select/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 0.12.0 [unreleased]

- Remove parallel dialing optimization, to avoid requiring the use of the `ls` command. See [PR 2934].

[PR 2934]: https://github.com/libp2p/rust-libp2p/pull/2934

# 0.11.0 [2022-01-27]

- Migrate to Rust edition 2021 (see [PR 2339]).
Expand Down
2 changes: 1 addition & 1 deletion misc/multistream-select/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "multistream-select"
edition = "2021"
rust-version = "1.56.1"
description = "Multistream-select negotiation protocol for libp2p"
version = "0.11.0"
version = "0.12.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
249 changes: 24 additions & 225 deletions misc/multistream-select/src/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use crate::protocol::{HeaderLine, Message, MessageIO, Protocol, ProtocolError};
use crate::{Negotiated, NegotiationError, Version};

use futures::{future::Either, prelude::*};
use futures::prelude::*;
use std::{
convert::TryFrom as _,
iter, mem,
Expand All @@ -39,12 +39,6 @@ use std::{
/// returned `Future` resolves with the name of the negotiated protocol and
/// a [`Negotiated`] I/O stream.
///
/// The chosen message flow for protocol negotiation depends on the numbers of
/// supported protocols given. That is, this function delegates to serial or
/// parallel variant based on the number of protocols given. The number of
/// protocols is determined through the `size_hint` of the given iterator and
/// thus an inaccurate size estimate may result in a suboptimal choice.
///
/// Within the scope of this library, a dialer always commits to a specific
/// multistream-select [`Version`], whereas a listener always supports
/// all versions supported by this library. Frictionless multistream-select
Expand All @@ -55,100 +49,40 @@ pub fn dialer_select_proto<R, I>(
protocols: I,
version: Version,
) -> DialerSelectFuture<R, I::IntoIter>
where
R: AsyncRead + AsyncWrite,
I: IntoIterator,
I::Item: AsRef<[u8]>,
{
let iter = protocols.into_iter();
// We choose between the "serial" and "parallel" strategies based on the number of protocols.
if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) {
Either::Left(dialer_select_proto_serial(inner, iter, version))
} else {
Either::Right(dialer_select_proto_parallel(inner, iter, version))
}
}

/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer
/// either trying protocols in-order, or by requesting all protocols supported
/// by the remote upfront, from which the first protocol found in the dialer's
/// list of protocols is selected.
pub type DialerSelectFuture<R, I> = Either<DialerSelectSeq<R, I>, DialerSelectPar<R, I>>;

/// Returns a `Future` that negotiates a protocol on the given I/O stream.
///
/// Just like [`dialer_select_proto`] but always using an iterative message flow,
/// trying the given list of supported protocols one-by-one.
///
/// This strategy is preferable if the dialer only supports a few protocols.
pub(crate) fn dialer_select_proto_serial<R, I>(
inner: R,
protocols: I,
version: Version,
) -> DialerSelectSeq<R, I::IntoIter>
where
R: AsyncRead + AsyncWrite,
I: IntoIterator,
I::Item: AsRef<[u8]>,
{
let protocols = protocols.into_iter().peekable();
DialerSelectSeq {
DialerSelectFuture {
version,
protocols,
state: SeqState::SendHeader {
state: State::SendHeader {
io: MessageIO::new(inner),
},
}
}

/// Returns a `Future` that negotiates a protocol on the given I/O stream.
///
/// Just like [`dialer_select_proto`] but always using a message flow that first
/// requests all supported protocols from the remote, selecting the first
/// protocol from the given list of supported protocols that is supported
/// by the remote.
///
/// This strategy may be beneficial if the dialer supports many protocols
/// and it is unclear whether the remote supports one of the first few.
pub(crate) fn dialer_select_proto_parallel<R, I>(
inner: R,
protocols: I,
version: Version,
) -> DialerSelectPar<R, I::IntoIter>
where
R: AsyncRead + AsyncWrite,
I: IntoIterator,
I::Item: AsRef<[u8]>,
{
let protocols = protocols.into_iter();
DialerSelectPar {
version,
protocols,
state: ParState::SendHeader {
io: MessageIO::new(inner),
},
}
}

/// A `Future` returned by [`dialer_select_proto_serial`] which negotiates
/// A `Future` returned by [`dialer_select_proto`] which negotiates
/// a protocol iteratively by considering one protocol after the other.
#[pin_project::pin_project]
pub struct DialerSelectSeq<R, I: Iterator> {
pub struct DialerSelectFuture<R, I: Iterator> {
// TODO: It would be nice if eventually N = I::Item = Protocol.
protocols: iter::Peekable<I>,
state: SeqState<R, I::Item>,
state: State<R, I::Item>,
version: Version,
}

enum SeqState<R, N> {
enum State<R, N> {
SendHeader { io: MessageIO<R> },
SendProtocol { io: MessageIO<R>, protocol: N },
FlushProtocol { io: MessageIO<R>, protocol: N },
AwaitProtocol { io: MessageIO<R>, protocol: N },
Done,
}

impl<R, I> Future for DialerSelectSeq<R, I>
impl<R, I> Future for DialerSelectFuture<R, I>
where
// The Unpin bound here is required because we produce a `Negotiated<R>` as the output.
// It also makes the implementation considerably easier to write.
Expand All @@ -162,12 +96,12 @@ where
let this = self.project();

loop {
match mem::replace(this.state, SeqState::Done) {
SeqState::SendHeader { mut io } => {
match mem::replace(this.state, State::Done) {
State::SendHeader { mut io } => {
match Pin::new(&mut io).poll_ready(cx)? {
Poll::Ready(()) => {}
Poll::Pending => {
*this.state = SeqState::SendHeader { io };
*this.state = State::SendHeader { io };
return Poll::Pending;
}
}
Expand All @@ -181,14 +115,14 @@ where

// The dialer always sends the header and the first protocol
// proposal in one go for efficiency.
*this.state = SeqState::SendProtocol { io, protocol };
*this.state = State::SendProtocol { io, protocol };
}

SeqState::SendProtocol { mut io, protocol } => {
State::SendProtocol { mut io, protocol } => {
match Pin::new(&mut io).poll_ready(cx)? {
Poll::Ready(()) => {}
Poll::Pending => {
*this.state = SeqState::SendProtocol { io, protocol };
*this.state = State::SendProtocol { io, protocol };
return Poll::Pending;
}
}
Expand All @@ -200,10 +134,10 @@ where
log::debug!("Dialer: Proposed protocol: {}", p);

if this.protocols.peek().is_some() {
*this.state = SeqState::FlushProtocol { io, protocol }
*this.state = State::FlushProtocol { io, protocol }
} else {
match this.version {
Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol },
Version::V1 => *this.state = State::FlushProtocol { io, protocol },
// This is the only effect that `V1Lazy` has compared to `V1`:
// Optimistically settling on the only protocol that
// the dialer supports for this negotiation. Notably,
Expand All @@ -218,21 +152,21 @@ where
}
}

SeqState::FlushProtocol { mut io, protocol } => {
State::FlushProtocol { mut io, protocol } => {
match Pin::new(&mut io).poll_flush(cx)? {
Poll::Ready(()) => *this.state = SeqState::AwaitProtocol { io, protocol },
Poll::Ready(()) => *this.state = State::AwaitProtocol { io, protocol },
Poll::Pending => {
*this.state = SeqState::FlushProtocol { io, protocol };
*this.state = State::FlushProtocol { io, protocol };
return Poll::Pending;
}
}
}

SeqState::AwaitProtocol { mut io, protocol } => {
State::AwaitProtocol { mut io, protocol } => {
let msg = match Pin::new(&mut io).poll_next(cx)? {
Poll::Ready(Some(msg)) => msg,
Poll::Pending => {
*this.state = SeqState::AwaitProtocol { io, protocol };
*this.state = State::AwaitProtocol { io, protocol };
return Poll::Pending;
}
// Treat EOF error as [`NegotiationError::Failed`], not as
Expand All @@ -243,7 +177,7 @@ where

match msg {
Message::Header(v) if v == HeaderLine::from(*this.version) => {
*this.state = SeqState::AwaitProtocol { io, protocol };
*this.state = State::AwaitProtocol { io, protocol };
}
Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
log::debug!("Dialer: Received confirmation for protocol: {}", p);
Expand All @@ -256,148 +190,13 @@ where
String::from_utf8_lossy(protocol.as_ref())
);
let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;
*this.state = SeqState::SendProtocol { io, protocol }
*this.state = State::SendProtocol { io, protocol }
}
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())),
}
}

SeqState::Done => panic!("SeqState::poll called after completion"),
}
}
}
}

/// A `Future` returned by [`dialer_select_proto_parallel`] which negotiates
/// a protocol selectively by considering all supported protocols of the remote
/// "in parallel".
#[pin_project::pin_project]
pub struct DialerSelectPar<R, I: Iterator> {
protocols: I,
state: ParState<R, I::Item>,
version: Version,
}

enum ParState<R, N> {
SendHeader { io: MessageIO<R> },
SendProtocolsRequest { io: MessageIO<R> },
Flush { io: MessageIO<R> },
RecvProtocols { io: MessageIO<R> },
SendProtocol { io: MessageIO<R>, protocol: N },
Done,
}

impl<R, I> Future for DialerSelectPar<R, I>
where
// The Unpin bound here is required because we produce a `Negotiated<R>` as the output.
// It also makes the implementation considerably easier to write.
R: AsyncRead + AsyncWrite + Unpin,
I: Iterator,
I::Item: AsRef<[u8]>,
{
type Output = Result<(I::Item, Negotiated<R>), NegotiationError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

loop {
match mem::replace(this.state, ParState::Done) {
ParState::SendHeader { mut io } => {
match Pin::new(&mut io).poll_ready(cx)? {
Poll::Ready(()) => {}
Poll::Pending => {
*this.state = ParState::SendHeader { io };
return Poll::Pending;
}
}

let msg = Message::Header(HeaderLine::from(*this.version));
if let Err(err) = Pin::new(&mut io).start_send(msg) {
return Poll::Ready(Err(From::from(err)));
}

*this.state = ParState::SendProtocolsRequest { io };
}

ParState::SendProtocolsRequest { mut io } => {
match Pin::new(&mut io).poll_ready(cx)? {
Poll::Ready(()) => {}
Poll::Pending => {
*this.state = ParState::SendProtocolsRequest { io };
return Poll::Pending;
}
}

if let Err(err) = Pin::new(&mut io).start_send(Message::ListProtocols) {
return Poll::Ready(Err(From::from(err)));
}

log::debug!("Dialer: Requested supported protocols.");
*this.state = ParState::Flush { io }
}

ParState::Flush { mut io } => match Pin::new(&mut io).poll_flush(cx)? {
Poll::Ready(()) => *this.state = ParState::RecvProtocols { io },
Poll::Pending => {
*this.state = ParState::Flush { io };
return Poll::Pending;
}
},

ParState::RecvProtocols { mut io } => {
let msg = match Pin::new(&mut io).poll_next(cx)? {
Poll::Ready(Some(msg)) => msg,
Poll::Pending => {
*this.state = ParState::RecvProtocols { io };
return Poll::Pending;
}
// Treat EOF error as [`NegotiationError::Failed`], not as
// [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O
// stream as a permissible way to "gracefully" fail a negotiation.
Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)),
};

match &msg {
Message::Header(h) if h == &HeaderLine::from(*this.version) => {
*this.state = ParState::RecvProtocols { io }
}
Message::Protocols(supported) => {
let protocol = this
.protocols
.by_ref()
.find(|p| supported.iter().any(|s| s.as_ref() == p.as_ref()))
.ok_or(NegotiationError::Failed)?;
log::debug!(
"Dialer: Found supported protocol: {}",
String::from_utf8_lossy(protocol.as_ref())
);
*this.state = ParState::SendProtocol { io, protocol };
}
_ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())),
}
}

ParState::SendProtocol { mut io, protocol } => {
match Pin::new(&mut io).poll_ready(cx)? {
Poll::Ready(()) => {}
Poll::Pending => {
*this.state = ParState::SendProtocol { io, protocol };
return Poll::Pending;
}
}

let p = Protocol::try_from(protocol.as_ref())?;
if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) {
return Poll::Ready(Err(From::from(err)));
}

log::debug!("Dialer: Expecting proposed protocol: {}", p);
let io = Negotiated::expecting(io.into_reader(), p, None);

return Poll::Ready(Ok((protocol, io)));
}

ParState::Done => panic!("ParState::poll called after completion"),
State::Done => panic!("State::poll called after completion"),
}
}
}
Expand Down
1 change: 0 additions & 1 deletion misc/multistream-select/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ mod length_delimited;
mod listener_select;
mod negotiated;
mod protocol;
mod tests;

pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture};
pub use self::listener_select::{listener_select_proto, ListenerSelectFuture};
Expand Down
Loading

0 comments on commit c71115d

Please sign in to comment.