-
Notifications
You must be signed in to change notification settings - Fork 948
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
swarm/connection: Enforce limit on inbound substreams via StreamMuxer
#2861
Changes from all commits
79ce63f
b829642
7182fd6
7190a79
f689617
4d9fa5b
1af9ba3
27274c8
2c3a606
772b775
6359137
bb930c4
d8d8165
a98415f
bbf7070
8fea3a7
65a06d2
91370a9
0621961
9197de0
82d3bd3
e677455
6b98c50
890cecd
44e9163
649021c
9ca11b0
34ef653
44c2e36
0ec5a58
1ff710d
ce4d9a7
e0ba9ba
9ea9ad8
a2f220a
e7cfb46
ee686ed
9bfa0cb
ee61244
dcedb4e
36f69ea
a2c2502
a25abeb
b8ad471
0cb8a87
1943c66
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,10 +24,12 @@ | |
use futures::{ | ||
future, | ||
prelude::*, | ||
ready, | ||
stream::{BoxStream, LocalBoxStream}, | ||
}; | ||
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; | ||
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; | ||
use std::collections::VecDeque; | ||
use std::{ | ||
fmt, io, iter, mem, | ||
pin::Pin, | ||
|
@@ -42,8 +44,20 @@ pub struct Yamux<S> { | |
incoming: S, | ||
/// Handle to control the connection. | ||
control: yamux::Control, | ||
/// Temporarily buffers inbound streams in case our node is performing backpressure on the remote. | ||
/// | ||
/// The only way how yamux can make progress is by driving the [`Incoming`] stream. However, the | ||
/// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via | ||
/// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general | ||
/// [`StreamMuxer::poll`] is designed to make progress on existing streams etc. | ||
/// | ||
/// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called. | ||
/// Once the buffer is full, new inbound streams are dropped. | ||
inbound_stream_buffer: VecDeque<yamux::Stream>, | ||
} | ||
|
||
const MAX_BUFFERED_INBOUND_STREAMS: usize = 25; | ||
|
||
impl<S> fmt::Debug for Yamux<S> { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
f.write_str("Yamux") | ||
|
@@ -65,6 +79,7 @@ where | |
_marker: std::marker::PhantomData, | ||
}, | ||
control: ctrl, | ||
inbound_stream_buffer: VecDeque::default(), | ||
} | ||
} | ||
} | ||
|
@@ -84,6 +99,7 @@ where | |
_marker: std::marker::PhantomData, | ||
}, | ||
control: ctrl, | ||
inbound_stream_buffer: VecDeque::default(), | ||
} | ||
} | ||
} | ||
|
@@ -101,13 +117,11 @@ where | |
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Result<Self::Substream, Self::Error>> { | ||
self.incoming.poll_next_unpin(cx).map(|maybe_stream| { | ||
let stream = maybe_stream | ||
.transpose()? | ||
.ok_or(YamuxError(ConnectionError::Closed))?; | ||
if let Some(stream) = self.inbound_stream_buffer.pop_front() { | ||
return Poll::Ready(Ok(stream)); | ||
} | ||
|
||
Ok(stream) | ||
}) | ||
self.poll_inner(cx) | ||
} | ||
|
||
fn poll_outbound( | ||
|
@@ -121,9 +135,21 @@ where | |
|
||
fn poll( | ||
self: Pin<&mut Self>, | ||
_: &mut Context<'_>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Result<StreamMuxerEvent, Self::Error>> { | ||
Poll::Pending | ||
let this = self.get_mut(); | ||
|
||
loop { | ||
let inbound_stream = ready!(this.poll_inner(cx))?; | ||
|
||
if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { | ||
log::warn!("dropping {inbound_stream} because buffer is full"); | ||
drop(inbound_stream); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just want to make sure we are making an informed decision here. Instead of tail dropping, we could as well do head dropping. I would have to put more thoughts into this before having an opinion. This does maintain the status quo, thus we can consider it a non-change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With this change, the timeout for opening a new stream already begins as the request is queued. I believe head dropping would negatively affect the number of successfully negotiated streams because we might drop a stream that was about to be processed by the local node. A dropped stream will trigger a In a scenario where a node opens 3 streams that are all pending in this buffer, I think it may be confusing if stream 1 fails but 2 and 3 succeed. It feels more natural to process this in FIFO order. |
||
continue; | ||
} | ||
|
||
this.inbound_stream_buffer.push_back(inbound_stream); | ||
} | ||
} | ||
|
||
fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll<YamuxResult<()>> { | ||
|
@@ -145,6 +171,21 @@ where | |
} | ||
} | ||
|
||
impl<S> Yamux<S> | ||
where | ||
S: Stream<Item = Result<yamux::Stream, YamuxError>> + Unpin, | ||
{ | ||
fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Result<yamux::Stream, YamuxError>> { | ||
self.incoming.poll_next_unpin(cx).map(|maybe_stream| { | ||
let stream = maybe_stream | ||
.transpose()? | ||
.ok_or(YamuxError(ConnectionError::Closed))?; | ||
|
||
Ok(stream) | ||
}) | ||
} | ||
} | ||
|
||
/// The yamux configuration. | ||
#[derive(Debug, Clone)] | ||
pub struct YamuxConfig { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am not mistaken, the additional buffer of 25 streams increases the overall number of possible inflight inbound streams, correct? In other words, while we previously only supported
max_negotiating_inbound_streams
before dropping streams, we now allow up tomax_negotiating_inbound_streams + 25
.Is this correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that sounds correct to me! Like I said, this number is completely arbitrary and we can change it to something else :)