From 4c3fd01667babfbc646d27eb4ce1690ad2547df7 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 4 May 2022 15:30:44 +0200 Subject: [PATCH 1/9] protocols/relay: Use prost-codec --- protocols/relay/CHANGELOG.md | 2 + protocols/relay/Cargo.toml | 2 +- protocols/relay/src/v2/client.rs | 11 ++-- protocols/relay/src/v2/client/handler.rs | 6 +- .../relay/src/v2/protocol/inbound_hop.rs | 65 +++++++------------ .../relay/src/v2/protocol/inbound_stop.rs | 52 +++++---------- .../relay/src/v2/protocol/outbound_hop.rs | 43 ++++-------- .../relay/src/v2/protocol/outbound_stop.rs | 43 ++++-------- protocols/relay/src/v2/relay.rs | 8 +-- protocols/relay/src/v2/relay/handler.rs | 21 +++--- 10 files changed, 98 insertions(+), 155 deletions(-) diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index f20134464de..4f07b8d6b84 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -4,6 +4,8 @@ - Update to `libp2p-swarm` `v0.36.0`. +- Expose explicits errors via `UpgradeError` instead of generic `io::Error`. + # 0.8.0 - Expose `{Inbound,Outbound}{Hop,Stop}UpgradeError`. See [PR 2586]. diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index a3aa84e9e53..a6e8c3e4689 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -21,12 +21,12 @@ libp2p-core = { version = "0.33.0", path = "../../core", default-features = fals libp2p-swarm = { version = "0.36.0", path = "../../swarm" } log = "0.4" pin-project = "1" +prost-codec = { version = "0.1", path = "../../misc/prost-codec" } prost = "0.10" rand = "0.8.4" smallvec = "1.6.1" static_assertions = "1" thiserror = "1.0" -unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } void = "1" [build-dependencies] diff --git a/protocols/relay/src/v2/client.rs b/protocols/relay/src/v2/client.rs index e670d24e3e1..d039cd08cbd 100644 --- a/protocols/relay/src/v2/client.rs +++ b/protocols/relay/src/v2/client.rs @@ -41,7 +41,7 @@ use libp2p_swarm::{ NotifyHandler, PollParameters, }; use std::collections::{hash_map, HashMap, VecDeque}; -use std::io::{Error, IoSlice}; +use std::io::{Error, ErrorKind, IoSlice}; use std::ops::DerefMut; use std::pin::Pin; use std::task::{Context, Poll}; @@ -84,7 +84,7 @@ pub enum Event { /// Denying an inbound circuit request failed. InboundCircuitReqDenyFailed { src_peer_id: PeerId, - error: std::io::Error, + error: inbound_stop::UpgradeError, }, } @@ -320,7 +320,7 @@ impl NetworkBehaviour for Client { /// A [`NegotiatedSubstream`] acting as a [`RelayedConnection`]. pub enum RelayedConnection { InboundAccepting { - accept: BoxFuture<'static, Result>, + accept: BoxFuture<'static, Result>, }, Operational { read_buffer: Bytes, @@ -338,7 +338,10 @@ impl RelayedConnection { ) -> Self { RelayedConnection::InboundAccepting { accept: async { - let (substream, read_buffer) = circuit.accept().await?; + let (substream, read_buffer) = circuit + .accept() + .await + .map_err(|e| Error::new(ErrorKind::Other, e))?; Ok(RelayedConnection::Operational { read_buffer, substream, diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index 94a69f6bd2d..eac72d94bc7 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -100,7 +100,7 @@ pub enum Event { /// Denying an inbound circuit request failed. InboundCircuitReqDenyFailed { src_peer_id: PeerId, - error: std::io::Error, + error: inbound_stop::UpgradeError, }, } @@ -196,7 +196,9 @@ pub struct Handler { /// eventually. alive_lend_out_substreams: FuturesUnordered>, - circuit_deny_futs: FuturesUnordered)>>, + circuit_deny_futs: FuturesUnordered< + BoxFuture<'static, (PeerId, Result<(), protocol::inbound_stop::UpgradeError>)>, + >, /// Futures that try to send errors to the transport. /// diff --git a/protocols/relay/src/v2/protocol/inbound_hop.rs b/protocols/relay/src/v2/protocol/inbound_hop.rs index 36b4079153d..893ae916e1c 100644 --- a/protocols/relay/src/v2/protocol/inbound_hop.rs +++ b/protocols/relay/src/v2/protocol/inbound_hop.rs @@ -25,13 +25,10 @@ use bytes::Bytes; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{upgrade, Multiaddr, PeerId}; use libp2p_swarm::NegotiatedSubstream; -use prost::Message; use std::convert::TryInto; -use std::io::Cursor; use std::iter; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use thiserror::Error; -use unsigned_varint::codec::UviBytes; pub struct Upgrade { pub reservation_duration: Duration, @@ -54,23 +51,19 @@ impl upgrade::InboundUpgrade for Upgrade { type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { - let mut codec = UviBytes::default(); - codec.set_max_len(MAX_MESSAGE_SIZE); - let mut substream = Framed::new(substream, codec); + let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE)); async move { - let msg: bytes::BytesMut = substream - .next() - .await - .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; - let HopMessage { r#type, peer, reservation: _, limit: _, status: _, - } = HopMessage::decode(Cursor::new(msg))?; + } = substream + .next() + .await + .ok_or(FatalUpgradeError::StreamClosed)??; let r#type = hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; @@ -103,28 +96,22 @@ pub enum UpgradeError { Fatal(#[from] FatalUpgradeError), } -impl From for UpgradeError { - fn from(error: prost::DecodeError) -> Self { - Self::Fatal(error.into()) - } -} - -impl From for UpgradeError { - fn from(error: std::io::Error) -> Self { +impl From for UpgradeError { + fn from(error: prost_codec::Error) -> Self { Self::Fatal(error.into()) } } #[derive(Debug, Error)] pub enum FatalUpgradeError { - #[error("Failed to decode message: {0}.")] - Decode( + #[error("Failed to encode or decode")] + Codec( #[from] #[source] - prost::DecodeError, + prost_codec::Error, ), - #[error(transparent)] - Io(#[from] std::io::Error), + #[error("Stream closed")] + StreamClosed, #[error("Failed to parse response type field.")] ParseTypeField, #[error("Failed to parse peer id.")] @@ -141,14 +128,14 @@ pub enum Req { } pub struct ReservationReq { - substream: Framed>>>, + substream: Framed>, reservation_duration: Duration, max_circuit_duration: Duration, max_circuit_bytes: u64, } impl ReservationReq { - pub async fn accept(self, addrs: Vec) -> Result<(), std::io::Error> { + pub async fn accept(self, addrs: Vec) -> Result<(), UpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, @@ -175,7 +162,7 @@ impl ReservationReq { self.send(msg).await } - pub async fn deny(self, status: Status) -> Result<(), std::io::Error> { + pub async fn deny(self, status: Status) -> Result<(), UpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, @@ -187,11 +174,8 @@ impl ReservationReq { self.send(msg).await } - async fn send(mut self, msg: HopMessage) -> Result<(), std::io::Error> { - let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut encoded_msg) - .expect("Vec to have sufficient capacity."); - self.substream.send(Cursor::new(encoded_msg)).await?; + async fn send(mut self, msg: HopMessage) -> Result<(), UpgradeError> { + self.substream.send(msg).await?; self.substream.flush().await?; self.substream.close().await?; @@ -201,7 +185,7 @@ impl ReservationReq { pub struct CircuitReq { dst: PeerId, - substream: Framed>>>, + substream: Framed>, } impl CircuitReq { @@ -209,7 +193,7 @@ impl CircuitReq { self.dst } - pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), std::io::Error> { + pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, @@ -234,7 +218,7 @@ impl CircuitReq { Ok((io, read_buffer.freeze())) } - pub async fn deny(mut self, status: Status) -> Result<(), std::io::Error> { + pub async fn deny(mut self, status: Status) -> Result<(), UpgradeError> { let msg = HopMessage { r#type: hop_message::Type::Status.into(), peer: None, @@ -243,14 +227,11 @@ impl CircuitReq { status: Some(status.into()), }; self.send(msg).await?; - self.substream.close().await + self.substream.close().await.map_err(Into::into) } - async fn send(&mut self, msg: HopMessage) -> Result<(), std::io::Error> { - let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut encoded_msg) - .expect("Vec to have sufficient capacity."); - self.substream.send(Cursor::new(encoded_msg)).await?; + async fn send(&mut self, msg: HopMessage) -> Result<(), prost_codec::Error> { + self.substream.send(msg).await?; self.substream.flush().await?; Ok(()) diff --git a/protocols/relay/src/v2/protocol/inbound_stop.rs b/protocols/relay/src/v2/protocol/inbound_stop.rs index 0f3919a84d9..8a23f34e7fa 100644 --- a/protocols/relay/src/v2/protocol/inbound_stop.rs +++ b/protocols/relay/src/v2/protocol/inbound_stop.rs @@ -25,11 +25,8 @@ use bytes::Bytes; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{upgrade, PeerId}; use libp2p_swarm::NegotiatedSubstream; -use prost::Message; -use std::io::Cursor; use std::iter; use thiserror::Error; -use unsigned_varint::codec::UviBytes; pub struct Upgrade {} @@ -48,22 +45,18 @@ impl upgrade::InboundUpgrade for Upgrade { type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, substream: NegotiatedSubstream, _: Self::Info) -> Self::Future { - let mut codec = UviBytes::default(); - codec.set_max_len(MAX_MESSAGE_SIZE); - let mut substream = Framed::new(substream, codec); + let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE)); async move { - let msg: bytes::BytesMut = substream - .next() - .await - .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; - let StopMessage { r#type, peer, limit, status: _, - } = StopMessage::decode(Cursor::new(msg))?; + } = substream + .next() + .await + .ok_or(FatalUpgradeError::StreamClosed)??; let r#type = stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; @@ -91,28 +84,22 @@ pub enum UpgradeError { Fatal(#[from] FatalUpgradeError), } -impl From for UpgradeError { - fn from(error: prost::DecodeError) -> Self { - Self::Fatal(error.into()) - } -} - -impl From for UpgradeError { - fn from(error: std::io::Error) -> Self { +impl From for UpgradeError { + fn from(error: prost_codec::Error) -> Self { Self::Fatal(error.into()) } } #[derive(Debug, Error)] pub enum FatalUpgradeError { - #[error("Failed to decode message: {0}.")] - Decode( + #[error("Failed to encode or decode")] + Codec( #[from] #[source] - prost::DecodeError, + prost_codec::Error, ), - #[error(transparent)] - Io(#[from] std::io::Error), + #[error("Stream closed")] + StreamClosed, #[error("Failed to parse response type field.")] ParseTypeField, #[error("Failed to parse peer id.")] @@ -124,7 +111,7 @@ pub enum FatalUpgradeError { } pub struct Circuit { - substream: Framed>>>, + substream: Framed>, src_peer_id: PeerId, limit: Option, } @@ -138,7 +125,7 @@ impl Circuit { self.limit } - pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), std::io::Error> { + pub async fn accept(mut self) -> Result<(NegotiatedSubstream, Bytes), UpgradeError> { let msg = StopMessage { r#type: stop_message::Type::Status.into(), peer: None, @@ -162,7 +149,7 @@ impl Circuit { Ok((io, read_buffer.freeze())) } - pub async fn deny(mut self, status: Status) -> Result<(), std::io::Error> { + pub async fn deny(mut self, status: Status) -> Result<(), UpgradeError> { let msg = StopMessage { r#type: stop_message::Type::Status.into(), peer: None, @@ -170,14 +157,11 @@ impl Circuit { status: Some(status.into()), }; - self.send(msg).await + self.send(msg).await.map_err(Into::into) } - async fn send(&mut self, msg: StopMessage) -> Result<(), std::io::Error> { - let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut encoded_msg) - .expect("Vec to have sufficient capacity."); - self.substream.send(Cursor::new(encoded_msg)).await?; + async fn send(&mut self, msg: StopMessage) -> Result<(), prost_codec::Error> { + self.substream.send(msg).await?; self.substream.flush().await?; Ok(()) diff --git a/protocols/relay/src/v2/protocol/outbound_hop.rs b/protocols/relay/src/v2/protocol/outbound_hop.rs index 472efbe33c9..9a20a1aa000 100644 --- a/protocols/relay/src/v2/protocol/outbound_hop.rs +++ b/protocols/relay/src/v2/protocol/outbound_hop.rs @@ -26,13 +26,10 @@ use futures::{future::BoxFuture, prelude::*}; use futures_timer::Delay; use libp2p_core::{upgrade, Multiaddr, PeerId}; use libp2p_swarm::NegotiatedSubstream; -use prost::Message; use std::convert::TryFrom; -use std::io::Cursor; use std::iter; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use thiserror::Error; -use unsigned_varint::codec::UviBytes; pub enum Upgrade { Reserve, @@ -74,28 +71,20 @@ impl upgrade::OutboundUpgrade for Upgrade { }, }; - let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut encoded_msg) - .expect("Vec to have sufficient capacity."); - - let mut codec = UviBytes::default(); - codec.set_max_len(MAX_MESSAGE_SIZE); - let mut substream = Framed::new(substream, codec); + let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE)); async move { - substream.send(Cursor::new(encoded_msg)).await?; - let msg: bytes::BytesMut = substream - .next() - .await - .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; - + substream.send(msg).await?; let HopMessage { r#type, peer: _, reservation, limit, status, - } = HopMessage::decode(Cursor::new(msg))?; + } = substream + .next() + .await + .ok_or(FatalUpgradeError::StreamClosed)??; let r#type = hop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; @@ -216,14 +205,8 @@ pub enum UpgradeError { Fatal(#[from] FatalUpgradeError), } -impl From for UpgradeError { - fn from(error: std::io::Error) -> Self { - Self::Fatal(error.into()) - } -} - -impl From for UpgradeError { - fn from(error: prost::DecodeError) -> Self { +impl From for UpgradeError { + fn from(error: prost_codec::Error) -> Self { Self::Fatal(error.into()) } } @@ -250,14 +233,14 @@ pub enum ReservationFailedReason { #[derive(Debug, Error)] pub enum FatalUpgradeError { - #[error("Failed to decode message: {0}.")] - Decode( + #[error("Failed to encode or decode")] + Codec( #[from] #[source] - prost::DecodeError, + prost_codec::Error, ), - #[error(transparent)] - Io(#[from] std::io::Error), + #[error("Stream closed")] + StreamClosed, #[error("Expected 'status' field to be set.")] MissingStatusField, #[error("Expected 'reservation' field to be set.")] diff --git a/protocols/relay/src/v2/protocol/outbound_stop.rs b/protocols/relay/src/v2/protocol/outbound_stop.rs index ad74182a9c9..26b51407ec0 100644 --- a/protocols/relay/src/v2/protocol/outbound_stop.rs +++ b/protocols/relay/src/v2/protocol/outbound_stop.rs @@ -25,13 +25,10 @@ use bytes::Bytes; use futures::{future::BoxFuture, prelude::*}; use libp2p_core::{upgrade, PeerId}; use libp2p_swarm::NegotiatedSubstream; -use prost::Message; use std::convert::TryInto; -use std::io::Cursor; use std::iter; use std::time::Duration; use thiserror::Error; -use unsigned_varint::codec::UviBytes; pub struct Upgrade { pub relay_peer_id: PeerId, @@ -72,27 +69,19 @@ impl upgrade::OutboundUpgrade for Upgrade { status: None, }; - let mut encoded_msg = Vec::with_capacity(msg.encoded_len()); - msg.encode(&mut encoded_msg) - .expect("Vec to have sufficient capacity."); - - let mut codec = UviBytes::default(); - codec.set_max_len(MAX_MESSAGE_SIZE); - let mut substream = Framed::new(substream, codec); + let mut substream = Framed::new(substream, prost_codec::Codec::new(MAX_MESSAGE_SIZE)); async move { - substream.send(std::io::Cursor::new(encoded_msg)).await?; - let msg: bytes::BytesMut = substream - .next() - .await - .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::UnexpectedEof, ""))??; - + substream.send(msg).await?; let StopMessage { r#type, peer: _, limit: _, status, - } = StopMessage::decode(Cursor::new(msg))?; + } = substream + .next() + .await + .ok_or(FatalUpgradeError::StreamClosed)??; let r#type = stop_message::Type::from_i32(r#type).ok_or(FatalUpgradeError::ParseTypeField)?; @@ -141,14 +130,8 @@ pub enum UpgradeError { Fatal(#[from] FatalUpgradeError), } -impl From for UpgradeError { - fn from(error: std::io::Error) -> Self { - Self::Fatal(error.into()) - } -} - -impl From for UpgradeError { - fn from(error: prost::DecodeError) -> Self { +impl From for UpgradeError { + fn from(error: prost_codec::Error) -> Self { Self::Fatal(error.into()) } } @@ -163,14 +146,14 @@ pub enum CircuitFailedReason { #[derive(Debug, Error)] pub enum FatalUpgradeError { - #[error("Failed to decode message: {0}.")] - Decode( + #[error("Failed to encode or decode")] + Codec( #[from] #[source] - prost::DecodeError, + prost_codec::Error, ), - #[error(transparent)] - Io(#[from] std::io::Error), + #[error("Stream closed")] + StreamClosed, #[error("Expected 'status' field to be set.")] MissingStatusField, #[error("Failed to parse response type field.")] diff --git a/protocols/relay/src/v2/relay.rs b/protocols/relay/src/v2/relay.rs index 353218453f1..8d8bb75caa9 100644 --- a/protocols/relay/src/v2/relay.rs +++ b/protocols/relay/src/v2/relay.rs @@ -138,14 +138,14 @@ pub enum Event { /// Accepting an inbound reservation request failed. ReservationReqAcceptFailed { src_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An inbound reservation request has been denied. ReservationReqDenied { src_peer_id: PeerId }, /// Denying an inbound reservation request has failed. ReservationReqDenyFailed { src_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An inbound reservation has timed out. ReservationTimedOut { src_peer_id: PeerId }, @@ -162,7 +162,7 @@ pub enum Event { CircuitReqDenyFailed { src_peer_id: PeerId, dst_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An inbound cirucit request has been accepted. CircuitReqAccepted { @@ -179,7 +179,7 @@ pub enum Event { CircuitReqAcceptFailed { src_peer_id: PeerId, dst_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An inbound circuit has closed. CircuitClosed { diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index 1b9c881d3ff..15721ab38fd 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -153,11 +153,11 @@ pub enum Event { renewed: bool, }, /// Accepting an inbound reservation request failed. - ReservationReqAcceptFailed { error: std::io::Error }, + ReservationReqAcceptFailed { error: inbound_hop::UpgradeError }, /// An inbound reservation request has been denied. ReservationReqDenied {}, /// Denying an inbound reservation request has failed. - ReservationReqDenyFailed { error: std::io::Error }, + ReservationReqDenyFailed { error: inbound_hop::UpgradeError }, /// An inbound reservation has timed out. ReservationTimedOut {}, /// An inbound circuit request has been received. @@ -178,7 +178,7 @@ pub enum Event { CircuitReqDenyFailed { circuit_id: Option, dst_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An inbound cirucit request has been accepted. CircuitReqAccepted { @@ -189,7 +189,7 @@ pub enum Event { CircuitReqAcceptFailed { circuit_id: CircuitId, dst_peer_id: PeerId, - error: std::io::Error, + error: inbound_hop::UpgradeError, }, /// An outbound substream for an inbound circuit request has been /// negotiated. @@ -404,16 +404,21 @@ pub struct Handler { keep_alive: KeepAlive, /// Futures accepting an inbound reservation request. - reservation_accept_futures: Futures>, + reservation_accept_futures: Futures>, /// Futures denying an inbound reservation request. - reservation_deny_futures: Futures>, + reservation_deny_futures: Futures>, /// Timeout for the currently active reservation. active_reservation: Option, /// Futures accepting an inbound circuit request. - circuit_accept_futures: Futures>, + circuit_accept_futures: + Futures>, /// Futures deying an inbound circuit request. - circuit_deny_futures: Futures<(Option, PeerId, Result<(), std::io::Error>)>, + circuit_deny_futures: Futures<( + Option, + PeerId, + Result<(), inbound_hop::UpgradeError>, + )>, /// Tracks substreams lend out to other [`Handler`]s. /// /// Contains a [`futures::future::Future`] for each lend out substream that From b92432358c25f9911603a4873d72d34bec5d0a95 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 4 May 2022 16:16:42 +0200 Subject: [PATCH 2/9] protocols/relay: Respond to at most one incoming reservation request Also changes poll order prioritizing - Error handling over everything. - Queued events over existing circuits. - Existing circuits over accepting new circuits. - Reservation management of existing reservation over new reservation requests. --- protocols/relay/src/v2/relay/handler.rs | 130 +++++++++++++++--------- 1 file changed, 80 insertions(+), 50 deletions(-) diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index 15721ab38fd..670cea959e7 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -354,8 +354,7 @@ impl IntoConnectionHandler for Prototype { config: self.config, queued_events: Default::default(), pending_error: Default::default(), - reservation_accept_futures: Default::default(), - reservation_deny_futures: Default::default(), + reservation_request_future: Default::default(), circuit_accept_futures: Default::default(), circuit_deny_futures: Default::default(), alive_lend_out_substreams: Default::default(), @@ -403,10 +402,8 @@ pub struct Handler { /// Until when to keep the connection alive. keep_alive: KeepAlive, - /// Futures accepting an inbound reservation request. - reservation_accept_futures: Futures>, - /// Futures denying an inbound reservation request. - reservation_deny_futures: Futures>, + /// Future handling inbound reservation request. + reservation_request_future: Option, /// Timeout for the currently active reservation. active_reservation: Option, @@ -432,6 +429,11 @@ pub struct Handler { circuits: Futures<(CircuitId, PeerId, Result<(), std::io::Error>)>, } +enum ReservationRequestFuture { + Accepting(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), + Denying(BoxFuture<'static, Result<(), inbound_hop::UpgradeError>>), +} + type Futures = FuturesUnordered>; impl ConnectionHandler for Handler { @@ -517,15 +519,27 @@ impl ConnectionHandler for Handler { inbound_reservation_req, addrs, } => { - self.reservation_accept_futures - .push(inbound_reservation_req.accept(addrs).boxed()); + if let Some(_) = + self.reservation_request_future + .replace(ReservationRequestFuture::Accepting( + inbound_reservation_req.accept(addrs).boxed(), + )) + { + log::warn!("Dropping existing deny/accept future in favor of new one.") + } } In::DenyReservationReq { inbound_reservation_req, status, } => { - self.reservation_deny_futures - .push(inbound_reservation_req.deny(status).boxed()); + if let Some(_) = + self.reservation_request_future + .replace(ReservationRequestFuture::Denying( + inbound_reservation_req.deny(status).boxed(), + )) + { + log::warn!("Dropping existing deny/accept future in favor of new one.") + } } In::NegotiateOutboundConnect { circuit_id, @@ -728,6 +742,7 @@ impl ConnectionHandler for Handler { return Poll::Ready(event); } + // Progress existing circuits. if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) = self.circuits.poll_next_unpin(cx) { @@ -749,40 +764,7 @@ impl ConnectionHandler for Handler { } } - if let Poll::Ready(Some(result)) = self.reservation_accept_futures.poll_next_unpin(cx) { - match result { - Ok(()) => { - let renewed = self - .active_reservation - .replace(Delay::new(self.config.reservation_duration)) - .is_some(); - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::ReservationReqAccepted { renewed }, - )); - } - Err(error) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::ReservationReqAcceptFailed { error }, - )); - } - } - } - - if let Poll::Ready(Some(result)) = self.reservation_deny_futures.poll_next_unpin(cx) { - match result { - Ok(()) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::ReservationReqDenied {}, - )) - } - Err(error) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::ReservationReqDenyFailed { error }, - )); - } - } - } - + // Accept new circuits. if let Poll::Ready(Some(result)) = self.circuit_accept_futures.poll_next_unpin(cx) { match result { Ok(parts) => { @@ -843,6 +825,7 @@ impl ConnectionHandler for Handler { } } + // Deny new circuits. if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) = self.circuit_deny_futures.poll_next_unpin(cx) { @@ -865,10 +848,7 @@ impl ConnectionHandler for Handler { } } - while let Poll::Ready(Some(Err(Canceled))) = - self.alive_lend_out_substreams.poll_next_unpin(cx) - {} - + // Check active reservation. if let Some(Poll::Ready(())) = self .active_reservation .as_mut() @@ -880,8 +860,58 @@ impl ConnectionHandler for Handler { )); } - if self.reservation_accept_futures.is_empty() - && self.reservation_deny_futures.is_empty() + // Progress reservation request. + match self.reservation_request_future.as_mut() { + Some(ReservationRequestFuture::Accepting(fut)) => { + if let Poll::Ready(result) = fut.poll_unpin(cx) { + self.reservation_request_future = None; + + match result { + Ok(()) => { + let renewed = self + .active_reservation + .replace(Delay::new(self.config.reservation_duration)) + .is_some(); + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::ReservationReqAccepted { renewed }, + )); + } + Err(error) => { + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::ReservationReqAcceptFailed { error }, + )); + } + } + } + } + Some(ReservationRequestFuture::Denying(fut)) => { + if let Poll::Ready(result) = fut.poll_unpin(cx) { + self.reservation_request_future = None; + + match result { + Ok(()) => { + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::ReservationReqDenied {}, + )) + } + Err(error) => { + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::ReservationReqDenyFailed { error }, + )); + } + } + } + } + None => {} + } + + // Check lend out substreams. + while let Poll::Ready(Some(Err(Canceled))) = + self.alive_lend_out_substreams.poll_next_unpin(cx) + {} + + // Check keep alive status. + if self.reservation_request_future.is_none() && self.circuit_accept_futures.is_empty() && self.circuit_deny_futures.is_empty() && self.alive_lend_out_substreams.is_empty() From e5000988435c5653ab3616b2e78cc7ab3aac3ce0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 5 May 2022 15:33:28 +0200 Subject: [PATCH 3/9] protocols/relay: Deny at most one circuit at a time --- protocols/relay/src/v2/client/handler.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index eac72d94bc7..4394016ef04 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -143,7 +143,7 @@ impl IntoConnectionHandler for Prototype { pending_error: Default::default(), reservation: Reservation::None, alive_lend_out_substreams: Default::default(), - circuit_deny_futs: Default::default(), + circuit_deny_fut: Default::default(), send_error_futs: Default::default(), keep_alive: KeepAlive::Yes, }; @@ -196,9 +196,8 @@ pub struct Handler { /// eventually. alive_lend_out_substreams: FuturesUnordered>, - circuit_deny_futs: FuturesUnordered< - BoxFuture<'static, (PeerId, Result<(), protocol::inbound_stop::UpgradeError>)>, - >, + circuit_deny_fut: + Option)>>, /// Futures that try to send errors to the transport. /// @@ -253,12 +252,14 @@ impl ConnectionHandler for Handler { } Reservation::None => { let src_peer_id = inbound_circuit.src_peer_id(); - self.circuit_deny_futs.push( + if let Some(_) = self.circuit_deny_fut.replace( inbound_circuit .deny(Status::NoReservation) .map(move |result| (src_peer_id, result)) .boxed(), - ) + ) { + log::warn!("Dropping existing circuit deny future in favor of new one.") + } } } } @@ -539,8 +540,11 @@ impl ConnectionHandler for Handler { } // Deny incoming circuit requests. - if let Poll::Ready(Some((src_peer_id, result))) = self.circuit_deny_futs.poll_next_unpin(cx) + if let Some(Poll::Ready((src_peer_id, result))) = + self.circuit_deny_fut.as_mut().map(|f| f.poll_unpin(cx)) { + self.circuit_deny_fut = None; + match result { Ok(()) => { return Poll::Ready(ConnectionHandlerEvent::Custom( @@ -570,7 +574,7 @@ impl ConnectionHandler for Handler { // Update keep-alive handling. if matches!(self.reservation, Reservation::None) && self.alive_lend_out_substreams.is_empty() - && self.circuit_deny_futs.is_empty() + && self.circuit_deny_fut.is_none() { match self.keep_alive { KeepAlive::Yes => { From bfde1b758224d74491784055d83cb30feab2d20a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 May 2022 15:13:59 +0200 Subject: [PATCH 4/9] protocols/relay: Deny <= 8 incoming circuit requests with one per peer --- protocols/relay/src/v2/client/handler.rs | 79 +++++++++++++++--------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index 4394016ef04..11e2e8fc8cf 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -39,11 +39,16 @@ use libp2p_swarm::{ KeepAlive, NegotiatedSubstream, SubstreamProtocol, }; use log::debug; -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::task::{Context, Poll}; use std::time::Duration; +/// The maximum number of circuits being denied concurrently. +/// +/// Circuits to be denied exceeding the limit are dropped. +const MAX_NUMBER_DENYING_CIRCUIT: usize = 8; + pub enum In { Reserve { to_listener: mpsc::Sender, @@ -143,7 +148,7 @@ impl IntoConnectionHandler for Prototype { pending_error: Default::default(), reservation: Reservation::None, alive_lend_out_substreams: Default::default(), - circuit_deny_fut: Default::default(), + circuit_deny_futs: Default::default(), send_error_futs: Default::default(), keep_alive: KeepAlive::Yes, }; @@ -196,8 +201,8 @@ pub struct Handler { /// eventually. alive_lend_out_substreams: FuturesUnordered>, - circuit_deny_fut: - Option)>>, + circuit_deny_futs: + HashMap>>, /// Futures that try to send errors to the transport. /// @@ -252,13 +257,24 @@ impl ConnectionHandler for Handler { } Reservation::None => { let src_peer_id = inbound_circuit.src_peer_id(); - if let Some(_) = self.circuit_deny_fut.replace( - inbound_circuit - .deny(Status::NoReservation) - .map(move |result| (src_peer_id, result)) - .boxed(), - ) { - log::warn!("Dropping existing circuit deny future in favor of new one.") + + if self.circuit_deny_futs.len() == MAX_NUMBER_DENYING_CIRCUIT + && !self.circuit_deny_futs.contains_key(&src_peer_id) + { + log::warn!( + "Dropping inbound circuit request to be denied from {:?} due to exceeding limit.", + src_peer_id, + ); + } else { + if let Some(_) = self.circuit_deny_futs.insert( + src_peer_id, + inbound_circuit.deny(Status::NoReservation).boxed(), + ) { + log::warn!( + "Dropping existing inbound circuit request to be denied from {:?} in favor of new one.", + src_peer_id + ) + } } } } @@ -540,23 +556,28 @@ impl ConnectionHandler for Handler { } // Deny incoming circuit requests. - if let Some(Poll::Ready((src_peer_id, result))) = - self.circuit_deny_fut.as_mut().map(|f| f.poll_unpin(cx)) - { - self.circuit_deny_fut = None; - - match result { - Ok(()) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::InboundCircuitReqDenied { src_peer_id }, - )) - } - Err(error) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::InboundCircuitReqDenyFailed { src_peer_id, error }, - )) - } - } + let maybe_event = + self.circuit_deny_futs + .iter_mut() + .find_map(|(src_peer_id, fut)| match fut.poll_unpin(cx) { + Poll::Ready(Ok(())) => Some(( + *src_peer_id, + Event::InboundCircuitReqDenied { + src_peer_id: *src_peer_id, + }, + )), + Poll::Ready(Err(error)) => Some(( + *src_peer_id, + Event::InboundCircuitReqDenyFailed { + src_peer_id: *src_peer_id, + error, + }, + )), + Poll::Pending => None, + }); + if let Some((src_peer_id, event)) = maybe_event { + self.circuit_deny_futs.remove(&src_peer_id); + return Poll::Ready(ConnectionHandlerEvent::Custom(event)); } // Send errors to transport. @@ -574,7 +595,7 @@ impl ConnectionHandler for Handler { // Update keep-alive handling. if matches!(self.reservation, Reservation::None) && self.alive_lend_out_substreams.is_empty() - && self.circuit_deny_fut.is_none() + && self.circuit_deny_futs.is_empty() { match self.keep_alive { KeepAlive::Yes => { From b4103cbb379f0d9e7bfab740cfa95ee6f5316400 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 11 May 2022 15:29:06 +0200 Subject: [PATCH 5/9] protocols/relay: Deny new circuits before accepting new circuits --- protocols/relay/src/v2/relay/handler.rs | 46 ++++++++++++------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index 670cea959e7..ac3c7fdc53b 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -764,6 +764,29 @@ impl ConnectionHandler for Handler { } } + // Deny new circuits. + if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) = + self.circuit_deny_futures.poll_next_unpin(cx) + { + match result { + Ok(()) => { + return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitReqDenied { + circuit_id, + dst_peer_id, + })); + } + Err(error) => { + return Poll::Ready(ConnectionHandlerEvent::Custom( + Event::CircuitReqDenyFailed { + circuit_id, + dst_peer_id, + error, + }, + )); + } + } + } + // Accept new circuits. if let Poll::Ready(Some(result)) = self.circuit_accept_futures.poll_next_unpin(cx) { match result { @@ -825,29 +848,6 @@ impl ConnectionHandler for Handler { } } - // Deny new circuits. - if let Poll::Ready(Some((circuit_id, dst_peer_id, result))) = - self.circuit_deny_futures.poll_next_unpin(cx) - { - match result { - Ok(()) => { - return Poll::Ready(ConnectionHandlerEvent::Custom(Event::CircuitReqDenied { - circuit_id, - dst_peer_id, - })); - } - Err(error) => { - return Poll::Ready(ConnectionHandlerEvent::Custom( - Event::CircuitReqDenyFailed { - circuit_id, - dst_peer_id, - error, - }, - )); - } - } - } - // Check active reservation. if let Some(Poll::Ready(())) = self .active_reservation From eefdcd38a0afcebb84c29941f9450db2ac35edb6 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 8 Jun 2022 11:34:46 +0200 Subject: [PATCH 6/9] protocols/relay: Bump version and add changelog entry --- protocols/relay/CHANGELOG.md | 6 ++++++ protocols/relay/Cargo.toml | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 4f07b8d6b84..6081e1b8bfe 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.9.1 - unreleased + +- Respond to at most one incoming reservation request. Deny <= 8 incoming + circuit requests with one per peer. And deny new circuits before accepting new + circuits. + # 0.9.0 - Update to `libp2p-core` `v0.33.0`. diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index a6e8c3e4689..7fe66e547d1 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-relay" edition = "2021" rust-version = "1.56.1" description = "Communications relaying for libp2p" -version = "0.9.0" +version = "0.9.1" authors = ["Parity Technologies ", "Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From 5866d857acbdd12b89e33856fd4f79e6d6dbf62d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 8 Jun 2022 11:47:52 +0200 Subject: [PATCH 7/9] protocols/relay: Use is_some() --- protocols/relay/src/v2/client/handler.rs | 12 +++++++----- protocols/relay/src/v2/relay/handler.rs | 22 ++++++++++++---------- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/protocols/relay/src/v2/client/handler.rs b/protocols/relay/src/v2/client/handler.rs index 11e2e8fc8cf..c0e2172329d 100644 --- a/protocols/relay/src/v2/client/handler.rs +++ b/protocols/relay/src/v2/client/handler.rs @@ -265,16 +265,18 @@ impl ConnectionHandler for Handler { "Dropping inbound circuit request to be denied from {:?} due to exceeding limit.", src_peer_id, ); - } else { - if let Some(_) = self.circuit_deny_futs.insert( + } else if self + .circuit_deny_futs + .insert( src_peer_id, inbound_circuit.deny(Status::NoReservation).boxed(), - ) { - log::warn!( + ) + .is_some() + { + log::warn!( "Dropping existing inbound circuit request to be denied from {:?} in favor of new one.", src_peer_id ) - } } } } diff --git a/protocols/relay/src/v2/relay/handler.rs b/protocols/relay/src/v2/relay/handler.rs index ac3c7fdc53b..9801ca74b43 100644 --- a/protocols/relay/src/v2/relay/handler.rs +++ b/protocols/relay/src/v2/relay/handler.rs @@ -519,11 +519,12 @@ impl ConnectionHandler for Handler { inbound_reservation_req, addrs, } => { - if let Some(_) = - self.reservation_request_future - .replace(ReservationRequestFuture::Accepting( - inbound_reservation_req.accept(addrs).boxed(), - )) + if self + .reservation_request_future + .replace(ReservationRequestFuture::Accepting( + inbound_reservation_req.accept(addrs).boxed(), + )) + .is_some() { log::warn!("Dropping existing deny/accept future in favor of new one.") } @@ -532,11 +533,12 @@ impl ConnectionHandler for Handler { inbound_reservation_req, status, } => { - if let Some(_) = - self.reservation_request_future - .replace(ReservationRequestFuture::Denying( - inbound_reservation_req.deny(status).boxed(), - )) + if self + .reservation_request_future + .replace(ReservationRequestFuture::Denying( + inbound_reservation_req.deny(status).boxed(), + )) + .is_some() { log::warn!("Dropping existing deny/accept future in favor of new one.") } From 87969394d82d0aba428c5b4808fc0fa901966acc Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 8 Jun 2022 11:49:27 +0200 Subject: [PATCH 8/9] Update protocols/relay/CHANGELOG.md Co-authored-by: Thomas Eizinger --- protocols/relay/CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 6081e1b8bfe..a0b0f21644a 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -2,8 +2,9 @@ - Respond to at most one incoming reservation request. Deny <= 8 incoming circuit requests with one per peer. And deny new circuits before accepting new - circuits. - + circuits. See [PR 2698]. + +[PR 2698]: https://github.com/libp2p/rust-libp2p/pull/2698 # 0.9.0 - Update to `libp2p-core` `v0.33.0`. From 4b71c0213fb88a16de41f2f7ac1c6ed24f1ad824 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 8 Jun 2022 12:17:52 +0200 Subject: [PATCH 9/9] protocols/relay/CHANGELOG: Reference pull request --- protocols/relay/CHANGELOG.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index a0b0f21644a..4fdaaaf958a 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -3,16 +3,18 @@ - Respond to at most one incoming reservation request. Deny <= 8 incoming circuit requests with one per peer. And deny new circuits before accepting new circuits. See [PR 2698]. - -[PR 2698]: https://github.com/libp2p/rust-libp2p/pull/2698 + +- Expose explicits errors via `UpgradeError` instead of generic `io::Error`. See + [PR 2698]. + +[PR 2698]: https://github.com/libp2p/rust-libp2p/pull/2698/ + # 0.9.0 - Update to `libp2p-core` `v0.33.0`. - Update to `libp2p-swarm` `v0.36.0`. -- Expose explicits errors via `UpgradeError` instead of generic `io::Error`. - # 0.8.0 - Expose `{Inbound,Outbound}{Hop,Stop}UpgradeError`. See [PR 2586].