diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index f20134464de..4fdaaaf958a 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,3 +1,14 @@ +# 0.9.1 - unreleased + +- Respond to at most one incoming reservation request. Deny <= 8 incoming + circuit requests with one per peer. And deny new circuits before accepting new + circuits. See [PR 2698]. + +- Expose explicits errors via `UpgradeError` instead of generic `io::Error`. See + [PR 2698]. + +[PR 2698]: https://github.com/libp2p/rust-libp2p/pull/2698/ + # 0.9.0 - Update to `libp2p-core` `v0.33.0`. diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index a3aa84e9e53..7fe66e547d1 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-relay" edition = "2021" rust-version = "1.56.1" description = "Communications relaying for libp2p" -version = "0.9.0" +version = "0.9.1" authors = ["Parity Technologies ", "Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -21,12 +21,12 @@ libp2p-core = { version = "0.33.0", path = "../../core", default-features = fals libp2p-swarm = { version = "0.36.0", path = "../../swarm" } log = "0.4" pin-project = "1" +prost-codec = { version = "0.1", path = "../../misc/prost-codec" } prost = "0.10" rand = "0.8.4" smallvec = "1.6.1" static_assertions = "1" thiserror = "1.0" -unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } void = "1" [build-dependencies] diff --git a/protocols/relay/src/v2/client.rs b/protocols/relay/src/v2/client.rs index e670d24e3e1..d039cd08cbd 100644 --- a/protocols/relay/src/v2/client.rs +++ b/protocols/relay/src/v2/client.rs @@ -41,7 +41,7 @@ use libp2p_swarm::{ NotifyHandler, PollParameters, }; use std::collections::{hash_map, HashMap, VecDeque}; -use std::io::{Error, IoSlice}; +use std::io::{Error, ErrorKind, IoSlice}; use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; @@ -84,7 +84,7 @@ pub enum Event { /// Denying an inbound circuit request failed. InboundCircuitReqDenyFailed { src_peer_id: PeerId, - error: std::io::Error, + error: inbound_stop::UpgradeError, }, } @@ -320,7 +320,7 @@ impl NetworkBehaviour for Client { /// A [`NegotiatedSubstream`] acting as a [`RelayedConnection`]. pub enum RelayedConnection { InboundAccepting { - accept: BoxFuture<'static, Result>, + accept: BoxFuture<'static, Result>, }, Operational { read_buffer: Bytes, @@ -338,7 +338,10 @@ impl RelayedConnection { ) -> Self { RelayedConnection::InboundAccepting { accept: async { - let (substream, read_buffer) = circuit.accept().await?; + let (substream, read_buffer) = circuit + .accept() + .await + .map_err(|e| Error::new(ErrorKind::Other, e))?; Ok(RelayedConnection::Operational { read_buffer, substream, diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index 94a69f6bd2d..c0e2172329d 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -39,11 +39,16 @@ use libp2p_swarm::{ KeepAlive, NegotiatedSubstream, SubstreamProtocol, }; use log::debug; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::task::{Context, Poll}; use std::time::Duration; +/// The maximum number of circuits being denied concurrently. +/// +/// Circuits to be denied exceeding the limit are dropped. +const MAX_NUMBER_DENYING_CIRCUIT: usize = 8; + pub enum In { Reserve { to_listener: mpsc::Sender, @@ -100,7 +105,7 @@ pub enum Event { /// Denying an inbound circuit request failed. InboundCircuitReqDenyFailed { src_peer_id: PeerId, - error: std::io::Error, + error: inbound_stop::UpgradeError, }, } @@ -196,7 +201,8 @@ pub struct Handler { /// eventually. alive_lend_out_substreams: FuturesUnordered>, - circuit_deny_futs: FuturesUnordered)>>, + circuit_deny_futs: + HashMap>>, /// Futures that try to send errors to the transport. /// @@ -251,12 +257,27 @@ impl ConnectionHandler for Handler { } Reservation::None => { let src_peer_id = inbound_circuit.src_peer_id(); - self.circuit_deny_futs.push( - inbound_circuit - .deny(Status::NoReservation) - .map(move |result| (src_peer_id, result)) - .boxed(), - ) + + if self.circuit_deny_futs.len() == MAX_NUMBER_DENYING_CIRCUIT + && !self.circuit_deny_futs.contains_key(&src_peer_id) + { + log::warn!( + "Dropping inbound circuit request to be denied from {:?} due to exceeding limit.", + src_peer_id, + ); + } else if self + .circuit_deny_futs + .insert( + src_peer_id, + inbound_circuit.deny(Status::NoReservation).boxed(), + ) + .is_some() + { + log::warn!( + "Dropping existing inbound circuit request to be denied from {:?} in favor of new one.", + src_peer_id + ) + } } } } @@ -537,20 +558,28 @@ impl ConnectionHandler for Handler { } // Deny incoming circuit requests. - if let Poll::Ready(Some((src_peer_id, result))) = self.circuit_deny_futs.poll_next_unpin(cx) - { - match result { - Ok(()) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::InboundCircuitReqDenied { src_peer_id }, - )) - } - Err(error) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::InboundCircuitReqDenyFailed { src_peer_id, error }, - )) - } - } + let maybe_event = + self.circuit_deny_futs + .iter_mut() + .find_map(|(src_peer_id, fut)| match fut.poll_unpin(cx) { + Poll::Ready(Ok(())) => Some(( + *src_peer_id, + Event::InboundCircuitReqDenied { + src_peer_id: *src_peer_id, + }, + )), + Poll::Ready(Err(error)) => Some(( + *src_peer_id, + Event::InboundCircuitReqDenyFailed { + src_peer_id: *src_peer_id, + error, + }, + )), + Poll::Pending => None, + }); + if let Some((src_peer_id, event)) = maybe_event { + self.circuit_deny_futs.remove(&src_peer_id); + return Poll::Ready(ConnectionHandlerEvent::Custom(event)); } // Send errors to transport. diff --git a/protocols/relay/src/v2/protocol/inbound_hop.rs b/protocols/relay/src/v2/protocol/inbound_hop.rs index 36b4079153d..893ae916e1c 100644 --- a/protocols/relay/src/v2/protocol/inbound_hop.rs +++ b/protocols/relay/src/v2/protocol/inbound_hop.rs @@ -25,13 +25,10 @@ use bytes::Bytes; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{upgrade, Multiaddr, PeerId}; use libp2p_swarm::NegotiatedSubstream; -use prost::Message; use std::convert::TryInto; -use std::io::Cursor; use std::iter; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use thiserror::Error; -use unsigned_varint::codec::UviBytes; pub struct Upgrade { pub reservation_duration: Duration, @@ -54,23 +51,19 @@ impl upgrade::InboundUpgrade for Upgrade { type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { - let mut codec = UviBytes::default(); - codec.set_max_len(MAX_MESSAGE_SIZE); - let mut substream = Framed::new(substream, codec); + let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE)); async move { - let msg: bytes::BytesMut = substream - .next() - .await - .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; - let HopMessage { r#type, peer, reservation: _, limit: _, status: _, - } = HopMessage::decode(Cursor::new(msg))?; + } = substream + .next() + .await + .ok_or(FatalUpgradeError::StreamClosed)??; let r#type = hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; @@ -103,28 +96,22 @@ pub enum UpgradeError { Fatal(#[from] FatalUpgradeError), } -impl From for UpgradeError { - fn from(error: prost::DecodeError) -> Self { - Self::Fatal(error.into()) - } -} - -impl From for UpgradeError { - fn from(error: std::io::Error) -> Self { +impl From for UpgradeError { + fn from(error: prost_codec::Error) -> Self { Self::Fatal(error.into()) } } #[derive(Debug, Error)] pub enum FatalUpgradeError { - #[error("Failed to decode message: {0}.")] - Decode( + #[error("Failed to encode or decode")] + Codec( #[from] #[source] - prost::DecodeError, + prost_codec::Error, ), - #[error(transparent)] - Io(#[from] std::io::Error), + #[error("Stream closed")] + StreamClosed, #[error("Failed to parse response type field.")] ParseTypeField, #[error("Failed to parse peer id.")] @@ -141,14 +128,14 @@ pub enum Req { } pub struct ReservationReq { - substream: Framed>>>, + substream: Framed>, reservation_duration: Duration, max_circuit_duration: Duration, max_circuit_bytes: u64, } impl ReservationReq { - pub async fn accept(self, addrs: Vec) -> Result<(), std::io::Error> { + pub async fn accept(self, addrs: Vec) -> Result<(), UpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, @@ -175,7 +162,7 @@ impl ReservationReq { self.send(msg).await } - pub async fn deny(self, status: Status) -> Result<(), std::io::Error> { + pub async fn deny(self, status: Status) -> Result<(), UpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, @@ -187,11 +174,8 @@ impl ReservationReq { self.send(msg).await } - async fn send(mut self, msg: HopMessage) -> Result<(), std::io::Error> { - let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut encoded_msg) - .expect("Vec to have sufficient capacity."); - self.substream.send(Cursor::new(encoded_msg)).await?; + async fn send(mut self, msg: HopMessage) -> Result<(), UpgradeError> { + self.substream.send(msg).await?; self.substream.flush().await?; self.substream.close().await?; @@ -201,7 +185,7 @@ impl ReservationReq { pub struct CircuitReq { dst: PeerId, - substream: Framed>>>, + substream: Framed>, } impl CircuitReq { @@ -209,7 +193,7 @@ impl CircuitReq { self.dst } - pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), std::io::Error> { + pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, @@ -234,7 +218,7 @@ impl CircuitReq { Ok((io, read_buffer.freeze())) } - pub async fn deny(mut self, status: Status) -> Result<(), std::io::Error> { + pub async fn deny(mut self, status: Status) -> Result<(), UpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, @@ -243,14 +227,11 @@ impl CircuitReq { status: Some(status.into()), }; self.send(msg).await?; - self.substream.close().await + self.substream.close().await.map_err(Into::into) } - async fn send(&mut self, msg: HopMessage) -> Result<(), std::io::Error> { - let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut encoded_msg) - .expect("Vec to have sufficient capacity."); - self.substream.send(Cursor::new(encoded_msg)).await?; + async fn send(&mut self, msg: HopMessage) -> Result<(), prost_codec::Error> { + self.substream.send(msg).await?; self.substream.flush().await?; Ok(()) diff --git a/protocols/relay/src/v2/protocol/inbound_stop.rs b/protocols/relay/src/v2/protocol/inbound_stop.rs index 0f3919a84d9..8a23f34e7fa 100644 --- a/protocols/relay/src/v2/protocol/inbound_stop.rs +++ b/protocols/relay/src/v2/protocol/inbound_stop.rs @@ -25,11 +25,8 @@ use bytes::Bytes; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{upgrade, PeerId}; use libp2p_swarm::NegotiatedSubstream; -use prost::Message; -use std::io::Cursor; use std::iter; use thiserror::Error; -use unsigned_varint::codec::UviBytes; pub struct Upgrade {} @@ -48,22 +45,18 @@ impl upgrade::InboundUpgrade for Upgrade { type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { - let mut codec = UviBytes::default(); - codec.set_max_len(MAX_MESSAGE_SIZE); - let mut substream = Framed::new(substream, codec); + let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE)); async move { - let msg: bytes::BytesMut = substream - .next() - .await - .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; - let StopMessage { r#type, peer, limit, status: _, - } = StopMessage::decode(Cursor::new(msg))?; + } = substream + .next() + .await + .ok_or(FatalUpgradeError::StreamClosed)??; let r#type = stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; @@ -91,28 +84,22 @@ pub enum UpgradeError { Fatal(#[from] FatalUpgradeError), } -impl From for UpgradeError { - fn from(error: prost::DecodeError) -> Self { - Self::Fatal(error.into()) - } -} - -impl From for UpgradeError { - fn from(error: std::io::Error) -> Self { +impl From for UpgradeError { + fn from(error: prost_codec::Error) -> Self { Self::Fatal(error.into()) } } #[derive(Debug, Error)] pub enum FatalUpgradeError { - #[error("Failed to decode message: {0}.")] - Decode( + #[error("Failed to encode or decode")] + Codec( #[from] #[source] - prost::DecodeError, + prost_codec::Error, ), - #[error(transparent)] - Io(#[from] std::io::Error), + #[error("Stream closed")] + StreamClosed, #[error("Failed to parse response type field.")] ParseTypeField, #[error("Failed to parse peer id.")] @@ -124,7 +111,7 @@ pub enum FatalUpgradeError { } pub struct Circuit { - substream: Framed>>>, + substream: Framed>, src_peer_id: PeerId, limit: Option, } @@ -138,7 +125,7 @@ impl Circuit { self.limit } - pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), std::io::Error> { + pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> { let msg = StopMessage { r#type: stop_message::Type::Status.into(), peer: None, @@ -162,7 +149,7 @@ impl Circuit { Ok((io, read_buffer.freeze())) } - pub async fn deny(mut self, status: Status) -> Result<(), std::io::Error> { + pub async fn deny(mut self, status: Status) -> Result<(), UpgradeError> { let msg = StopMessage { r#type: stop_message::Type::Status.into(), peer: None, @@ -170,14 +157,11 @@ impl Circuit { status: Some(status.into()), }; - self.send(msg).await + self.send(msg).await.map_err(Into::into) } - async fn send(&mut self, msg: StopMessage) -> Result<(), std::io::Error> { - let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut encoded_msg) - .expect("Vec to have sufficient capacity."); - self.substream.send(Cursor::new(encoded_msg)).await?; + async fn send(&mut self, msg: StopMessage) -> Result<(), prost_codec::Error> { + self.substream.send(msg).await?; self.substream.flush().await?; Ok(()) diff --git a/protocols/relay/src/v2/protocol/outbound_hop.rs b/protocols/relay/src/v2/protocol/outbound_hop.rs index 472efbe33c9..9a20a1aa000 100644 --- a/protocols/relay/src/v2/protocol/outbound_hop.rs +++ b/protocols/relay/src/v2/protocol/outbound_hop.rs @@ -26,13 +26,10 @@ use futures::{future::BoxFuture, prelude::*}; use futures_timer::Delay; use libp2p_core::{upgrade, Multiaddr, PeerId}; use libp2p_swarm::NegotiatedSubstream; -use prost::Message; use std::convert::TryFrom; -use std::io::Cursor; use std::iter; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use thiserror::Error; -use unsigned_varint::codec::UviBytes; pub enum Upgrade { Reserve, @@ -74,28 +71,20 @@ impl upgrade::OutboundUpgrade for Upgrade { }, }; - let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut encoded_msg) - .expect("Vec to have sufficient capacity."); - - let mut codec = UviBytes::default(); - codec.set_max_len(MAX_MESSAGE_SIZE); - let mut substream = Framed::new(substream, codec); + let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE)); async move { - substream.send(Cursor::new(encoded_msg)).await?; - let msg: bytes::BytesMut = substream - .next() - .await - .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; - + substream.send(msg).await?; let HopMessage { r#type, peer: _, reservation, limit, status, - } = HopMessage::decode(Cursor::new(msg))?; + } = substream + .next() + .await + .ok_or(FatalUpgradeError::StreamClosed)??; let r#type = hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; @@ -216,14 +205,8 @@ pub enum UpgradeError { Fatal(#[from] FatalUpgradeError), } -impl From for UpgradeError { - fn from(error: std::io::Error) -> Self { - Self::Fatal(error.into()) - } -} - -impl From for UpgradeError { - fn from(error: prost::DecodeError) -> Self { +impl From for UpgradeError { + fn from(error: prost_codec::Error) -> Self { Self::Fatal(error.into()) } } @@ -250,14 +233,14 @@ pub enum ReservationFailedReason { #[derive(Debug, Error)] pub enum FatalUpgradeError { - #[error("Failed to decode message: {0}.")] - Decode( + #[error("Failed to encode or decode")] + Codec( #[from] #[source] - prost::DecodeError, + prost_codec::Error, ), - #[error(transparent)] - Io(#[from] std::io::Error), + #[error("Stream closed")] + StreamClosed, #[error("Expected 'status' field to be set.")] MissingStatusField, #[error("Expected 'reservation' field to be set.")] diff --git a/protocols/relay/src/v2/protocol/outbound_stop.rs b/protocols/relay/src/v2/protocol/outbound_stop.rs index ad74182a9c9..26b51407ec0 100644 --- a/protocols/relay/src/v2/protocol/outbound_stop.rs +++ b/protocols/relay/src/v2/protocol/outbound_stop.rs @@ -25,13 +25,10 @@ use bytes::Bytes; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{upgrade, PeerId}; use libp2p_swarm::NegotiatedSubstream; -use prost::Message; use std::convert::TryInto; -use std::io::Cursor; use std::iter; use std::time::Duration; use thiserror::Error; -use unsigned_varint::codec::UviBytes; pub struct Upgrade { pub relay_peer_id: PeerId, @@ -72,27 +69,19 @@ impl upgrade::OutboundUpgrade for Upgrade { status: None, }; - let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut encoded_msg) - .expect("Vec to have sufficient capacity."); - - let mut codec = UviBytes::default(); - codec.set_max_len(MAX_MESSAGE_SIZE); - let mut substream = Framed::new(substream, codec); + let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE)); async move { - substream.send(std::io::Cursor::new(encoded_msg)).await?; - let msg: bytes::BytesMut = substream - .next() - .await - .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; - + substream.send(msg).await?; let StopMessage { r#type, peer: _, limit: _, status, - } = StopMessage::decode(Cursor::new(msg))?; + } = substream + .next() + .await + .ok_or(FatalUpgradeError::StreamClosed)??; let r#type = stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; @@ -141,14 +130,8 @@ pub enum UpgradeError { Fatal(#[from] FatalUpgradeError), } -impl From for UpgradeError { - fn from(error: std::io::Error) -> Self { - Self::Fatal(error.into()) - } -} - -impl From for UpgradeError { - fn from(error: prost::DecodeError) -> Self { +impl From for UpgradeError { + fn from(error: prost_codec::Error) -> Self { Self::Fatal(error.into()) } } @@ -163,14 +146,14 @@ pub enum CircuitFailedReason { #[derive(Debug, Error)] pub enum FatalUpgradeError { - #[error("Failed to decode message: {0}.")] - Decode( + #[error("Failed to encode or decode")] + Codec( #[from] #[source] - prost::DecodeError, + prost_codec::Error, ), - #[error(transparent)] - Io(#[from] std::io::Error), + #[error("Stream closed")] + StreamClosed, #[error("Expected 'status' field to be set.")] MissingStatusField, #[error("Failed to parse response type field.")] diff --git a/protocols/relay/src/v2/relay.rs b/protocols/relay/src/v2/relay.rs index 353218453f1..8d8bb75caa9 100644 --- a/protocols/relay/src/v2/relay.rs +++ b/protocols/relay/src/v2/relay.rs @@ -138,14 +138,14 @@ pub enum Event { /// Accepting an inbound reservation request failed. ReservationReqAcceptFailed { src_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An inbound reservation request has been denied. ReservationReqDenied { src_peer_id: PeerId }, /// Denying an inbound reservation request has failed. ReservationReqDenyFailed { src_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An inbound reservation has timed out. ReservationTimedOut { src_peer_id: PeerId }, @@ -162,7 +162,7 @@ pub enum Event { CircuitReqDenyFailed { src_peer_id: PeerId, dst_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An inbound cirucit request has been accepted. CircuitReqAccepted { @@ -179,7 +179,7 @@ pub enum Event { CircuitReqAcceptFailed { src_peer_id: PeerId, dst_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An inbound circuit has closed. CircuitClosed { diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index 1b9c881d3ff..9801ca74b43 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -153,11 +153,11 @@ pub enum Event { renewed: bool, }, /// Accepting an inbound reservation request failed. - ReservationReqAcceptFailed { error: std::io::Error }, + ReservationReqAcceptFailed { error: inbound_hop::UpgradeError }, /// An inbound reservation request has been denied. ReservationReqDenied {}, /// Denying an inbound reservation request has failed. - ReservationReqDenyFailed { error: std::io::Error }, + ReservationReqDenyFailed { error: inbound_hop::UpgradeError }, /// An inbound reservation has timed out. ReservationTimedOut {}, /// An inbound circuit request has been received. @@ -178,7 +178,7 @@ pub enum Event { CircuitReqDenyFailed { circuit_id: Option, dst_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An inbound cirucit request has been accepted. CircuitReqAccepted { @@ -189,7 +189,7 @@ pub enum Event { CircuitReqAcceptFailed { circuit_id: CircuitId, dst_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An outbound substream for an inbound circuit request has been /// negotiated. @@ -354,8 +354,7 @@ impl IntoConnectionHandler for Prototype { config: self.config, queued_events: Default::default(), pending_error: Default::default(), - reservation_accept_futures: Default::default(), - reservation_deny_futures: Default::default(), + reservation_request_future: Default::default(), circuit_accept_futures: Default::default(), circuit_deny_futures: Default::default(), alive_lend_out_substreams: Default::default(), @@ -403,17 +402,20 @@ pub struct Handler { /// Until when to keep the connection alive. keep_alive: KeepAlive, - /// Futures accepting an inbound reservation request. - reservation_accept_futures: Futures>, - /// Futures denying an inbound reservation request. - reservation_deny_futures: Futures>, + /// Future handling inbound reservation request. + reservation_request_future: Option, /// Timeout for the currently active reservation. active_reservation: Option, /// Futures accepting an inbound circuit request. - circuit_accept_futures: Futures>, + circuit_accept_futures: + Futures>, /// Futures deying an inbound circuit request. - circuit_deny_futures: Futures<(Option, PeerId, Result<(), std::io::Error>)>, + circuit_deny_futures: Futures<( + Option, + PeerId, + Result<(), inbound_hop::UpgradeError>, + )>, /// Tracks substreams lend out to other [`Handler`]s. /// /// Contains a [`futures::future::Future`] for each lend out substream that @@ -427,6 +429,11 @@ pub struct Handler { circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>, } +enum ReservationRequestFuture { + Accepting(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), + Denying(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), +} + type Futures = FuturesUnordered>; impl ConnectionHandler for Handler { @@ -512,15 +519,29 @@ impl ConnectionHandler for Handler { inbound_reservation_req, addrs, } => { - self.reservation_accept_futures - .push(inbound_reservation_req.accept(addrs).boxed()); + if self + .reservation_request_future + .replace(ReservationRequestFuture::Accepting( + inbound_reservation_req.accept(addrs).boxed(), + )) + .is_some() + { + log::warn!("Dropping existing deny/accept future in favor of new one.") + } } In::DenyReservationReq { inbound_reservation_req, status, } => { - self.reservation_deny_futures - .push(inbound_reservation_req.deny(status).boxed()); + if self + .reservation_request_future + .replace(ReservationRequestFuture::Denying( + inbound_reservation_req.deny(status).boxed(), + )) + .is_some() + { + log::warn!("Dropping existing deny/accept future in favor of new one.") + } } In::NegotiateOutboundConnect { circuit_id, @@ -723,6 +744,7 @@ impl ConnectionHandler for Handler { return Poll::Ready(event); } + // Progress existing circuits. if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) = self.circuits.poll_next_unpin(cx) { @@ -744,40 +766,30 @@ impl ConnectionHandler for Handler { } } - if let Poll::Ready(Some(result)) = self.reservation_accept_futures.poll_next_unpin(cx) { - match result { - Ok(()) => { - let renewed = self - .active_reservation - .replace(Delay::new(self.config.reservation_duration)) - .is_some(); - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::ReservationReqAccepted { renewed }, - )); - } - Err(error) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::ReservationReqAcceptFailed { error }, - )); - } - } - } - - if let Poll::Ready(Some(result)) = self.reservation_deny_futures.poll_next_unpin(cx) { + // Deny new circuits. + if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) = + self.circuit_deny_futures.poll_next_unpin(cx) + { match result { Ok(()) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::ReservationReqDenied {}, - )) + return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitReqDenied { + circuit_id, + dst_peer_id, + })); } Err(error) => { return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::ReservationReqDenyFailed { error }, + Event::CircuitReqDenyFailed { + circuit_id, + dst_peer_id, + error, + }, )); } } } + // Accept new circuits. if let Poll::Ready(Some(result)) = self.circuit_accept_futures.poll_next_unpin(cx) { match result { Ok(parts) => { @@ -838,32 +850,7 @@ impl ConnectionHandler for Handler { } } - if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) = - self.circuit_deny_futures.poll_next_unpin(cx) - { - match result { - Ok(()) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitReqDenied { - circuit_id, - dst_peer_id, - })); - } - Err(error) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::CircuitReqDenyFailed { - circuit_id, - dst_peer_id, - error, - }, - )); - } - } - } - - while let Poll::Ready(Some(Err(Canceled))) = - self.alive_lend_out_substreams.poll_next_unpin(cx) - {} - + // Check active reservation. if let Some(Poll::Ready(())) = self .active_reservation .as_mut() @@ -875,8 +862,58 @@ impl ConnectionHandler for Handler { )); } - if self.reservation_accept_futures.is_empty() - && self.reservation_deny_futures.is_empty() + // Progress reservation request. + match self.reservation_request_future.as_mut() { + Some(ReservationRequestFuture::Accepting(fut)) => { + if let Poll::Ready(result) = fut.poll_unpin(cx) { + self.reservation_request_future = None; + + match result { + Ok(()) => { + let renewed = self + .active_reservation + .replace(Delay::new(self.config.reservation_duration)) + .is_some(); + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::ReservationReqAccepted { renewed }, + )); + } + Err(error) => { + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::ReservationReqAcceptFailed { error }, + )); + } + } + } + } + Some(ReservationRequestFuture::Denying(fut)) => { + if let Poll::Ready(result) = fut.poll_unpin(cx) { + self.reservation_request_future = None; + + match result { + Ok(()) => { + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::ReservationReqDenied {}, + )) + } + Err(error) => { + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::ReservationReqDenyFailed { error }, + )); + } + } + } + } + None => {} + } + + // Check lend out substreams. + while let Poll::Ready(Some(Err(Canceled))) = + self.alive_lend_out_substreams.poll_next_unpin(cx) + {} + + // Check keep alive status. + if self.reservation_request_future.is_none() && self.circuit_accept_futures.is_empty() && self.circuit_deny_futures.is_empty() && self.alive_lend_out_substreams.is_empty()