Skip to content

Commit

Permalink
chore: remove WindowUpdateMode::OnReceive
Browse files Browse the repository at this point in the history
Follow up to previous deprecation libp2p#177.

Fixes libp2p#175.
  • Loading branch information
mxinden committed Nov 23, 2023
1 parent 93b7062 commit a85b148
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 106 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# 0.13.0

- Remove `WindowUpdateMode`.
Behavior will always be `WindowUpdateMode::OnRead`, thus enabling flow-control and enforcing backpressure.
See [PR 178](https://github.com/libp2p/rust-yamux/pull/178).

# 0.12.0

- Remove `Control` and `ControlledConnection`.
Expand Down
3 changes: 2 additions & 1 deletion yamux/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yamux"
version = "0.12.0"
version = "0.13.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "Apache-2.0 OR MIT"
description = "Multiplexer over reliable, ordered connections"
Expand All @@ -19,4 +19,5 @@ static_assertions = "1"
pin-project = "1.1.0"

[dev-dependencies]
futures = { version = "0.3.12", default-features = false, features = ["executor"] }
quickcheck = "1.0"
65 changes: 13 additions & 52 deletions yamux/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
error::ConnectionError,
frame::header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID},
frame::{self, Frame},
Config, WindowUpdateMode, DEFAULT_CREDIT,
Config, DEFAULT_CREDIT,
};
use crate::{Result, MAX_ACK_BACKLOG};
use cleanup::Cleanup;
Expand Down Expand Up @@ -304,9 +304,7 @@ enum Action {
/// Nothing to be done.
None,
/// A new stream has been opened by the remote.
New(Stream, Option<Frame<WindowUpdate>>),
/// A window update should be sent to the remote.
Update(Frame<WindowUpdate>),
New(Stream),
/// A ping should be answered.
Ping(Frame<Ping>),
/// A stream should be reset.
Expand Down Expand Up @@ -504,23 +502,13 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
// The remote may be out of credit though and blocked on
// writing more data. We may need to reset the stream.
State::SendClosed => {
if self.config.window_update_mode == WindowUpdateMode::OnRead
&& shared.window == 0
{
// The remote may be waiting for a window update
// which we will never send, so reset the stream now.
let mut header = Header::data(stream_id, 0);
header.rst();
Some(Frame::new(header))
} else {
// The remote has either still credit or will be given more
// (due to an enqueued window update or because the update
// mode is `OnReceive`) or we already have inbound frames in
// the socket buffer which will be processed later. In any
// case we will reply with an RST in `Connection::on_data`
// because the stream will no longer be known.
None
}
// The remote has either still credit or will be given more
// (due to an enqueued window update or because the update
// mode is `OnReceive`) or we already have inbound frames in
// the socket buffer which will be processed later. In any
// case we will reply with an RST in `Connection::on_data`
// because the stream will no longer be known.
None
}
// The stream was properly closed. We already have sent our FIN frame. The
// remote end has already done so in the past.
Expand Down Expand Up @@ -569,18 +557,10 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
};
match action {
Action::None => {}
Action::New(stream, update) => {
Action::New(stream) => {
log::trace!("{}: new inbound {} of {}", self.id, stream, self);
if let Some(f) = update {
log::trace!("{}/{}: sending update", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
}
return Ok(Some(stream));
}
Action::Update(f) => {
log::trace!("{}: sending update: {:?}", self.id, f.header());
self.pending_frames.push_back(f.into());
}
Action::Ping(f) => {
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
self.pending_frames.push_back(f.into());
Expand Down Expand Up @@ -641,29 +621,17 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
return Action::Terminate(Frame::internal_error());
}
let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
let mut window_update = None;
{
let mut shared = stream.shared();
if is_finish {
shared.update_state(self.id, stream_id, State::RecvClosed);
}
shared.window = shared.window.saturating_sub(frame.body_len());
shared.buffer.push(frame.into_body());

if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
if let Some(credit) = shared.next_window_update() {
shared.window += credit;
let mut frame = Frame::window_update(stream_id, credit);
frame.header_mut().ack();
window_update = Some(frame)
}
}
}
if window_update.is_none() {
stream.set_flag(stream::Flag::Ack)
}
stream.set_flag(stream::Flag::Ack);
self.streams.insert(stream_id, stream.clone_shared());
return Action::New(stream, window_update);
return Action::New(stream);
}

if let Some(s) = self.streams.get_mut(&stream_id) {
Expand Down Expand Up @@ -695,13 +663,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
if let Some(w) = shared.reader.take() {
w.wake()
}
if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
if let Some(credit) = shared.next_window_update() {
shared.window += credit;
let frame = Frame::window_update(stream_id, credit);
return Action::Update(frame);
}
}
} else {
log::trace!(
"{}/{}: data frame for unknown stream, possibly dropped earlier: {:?}",
Expand Down Expand Up @@ -766,7 +727,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
.update_state(self.id, stream_id, State::RecvClosed);
}
self.streams.insert(stream_id, stream.clone_shared());
return Action::New(stream, None);
return Action::New(stream);
}

if let Some(s) = self.streams.get_mut(&stream_id) {
Expand Down
26 changes: 7 additions & 19 deletions yamux/src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
header::{Data, Header, StreamId, WindowUpdate},
Frame,
},
Config, WindowUpdateMode, DEFAULT_CREDIT,
Config, DEFAULT_CREDIT,
};
use futures::{
channel::mpsc,
Expand Down Expand Up @@ -200,12 +200,6 @@ impl Stream {
/// Send new credit to the sending side via a window update message if
/// permitted.
fn send_window_update(&mut self, cx: &mut Context) -> Poll<io::Result<()>> {
// When using [`WindowUpdateMode::OnReceive`] window update messages are
// send early on data receival (see [`crate::Connection::on_frame`]).
if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) {
return Poll::Ready(Ok(()));
}

let mut shared = self.shared.lock();

if let Some(credit) = shared.next_window_update() {
Expand Down Expand Up @@ -487,6 +481,7 @@ impl Shared {
current // Return the previous stream state for informational purposes.
}

// TODO: This does not need to live in shared any longer.
/// Calculate the number of additional window bytes the receiving side
/// should grant the sending side via a window update message.
///
Expand All @@ -499,19 +494,12 @@ impl Shared {
return None;
}

let new_credit = match self.config.window_update_mode {
WindowUpdateMode::OnReceive => {
debug_assert!(self.config.receive_window >= self.window);
let new_credit = {
debug_assert!(self.config.receive_window >= self.window);
let bytes_received = self.config.receive_window.saturating_sub(self.window);
let buffer_len: u32 = self.buffer.len().try_into().unwrap_or(std::u32::MAX);

self.config.receive_window.saturating_sub(self.window)
}
WindowUpdateMode::OnRead => {
debug_assert!(self.config.receive_window >= self.window);
let bytes_received = self.config.receive_window.saturating_sub(self.window);
let buffer_len: u32 = self.buffer.len().try_into().unwrap_or(std::u32::MAX);

bytes_received.saturating_sub(buffer_len)
}
bytes_received.saturating_sub(buffer_len)
};

// Send WindowUpdate message when half or more of the configured receive
Expand Down
34 changes: 0 additions & 34 deletions yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,6 @@ const MAX_ACK_BACKLOG: usize = 256;
/// https://github.com/paritytech/yamux/issues/100.
const DEFAULT_SPLIT_SEND_SIZE: usize = 16 * 1024;

/// Specifies when window update frames are sent.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WindowUpdateMode {
/// Send window updates as soon as a [`Stream`]'s receive window drops to 0.
///
/// This ensures that the sender can resume sending more data as soon as possible
/// but a slow reader on the receiving side may be overwhelmed, i.e. it accumulates
/// data in its buffer which may reach its limit (see `set_max_buffer_size`).
/// In this mode, window updates merely prevent head of line blocking but do not
/// effectively exercise back pressure on senders.
OnReceive,

/// Send window updates only when data is read on the receiving end.
///
/// This ensures that senders do not overwhelm receivers and keeps buffer usage
/// low. However, depending on the protocol, there is a risk of deadlock, namely
/// if both endpoints want to send data larger than the receivers window and they
/// do not read before finishing their writes. Use this mode only if you are sure
/// that this will never happen, i.e. if
///
/// - Endpoints *A* and *B* never write at the same time, *or*
/// - Endpoints *A* and *B* write at most *n* frames concurrently such that the sum
/// of the frame lengths is less or equal to the available credit of *A* and *B*
/// respectively.
OnRead,
}

/// Yamux configuration.
///
Expand All @@ -105,7 +79,6 @@ pub struct Config {
receive_window: u32,
max_buffer_size: usize,
max_num_streams: usize,
window_update_mode: WindowUpdateMode,
read_after_close: bool,
split_send_size: usize,
}
Expand All @@ -116,7 +89,6 @@ impl Default for Config {
receive_window: DEFAULT_CREDIT,
max_buffer_size: 1024 * 1024,
max_num_streams: 8192,
window_update_mode: WindowUpdateMode::OnRead,
read_after_close: true,
split_send_size: DEFAULT_SPLIT_SEND_SIZE,
}
Expand Down Expand Up @@ -147,12 +119,6 @@ impl Config {
self
}

/// Set the window update mode to use.
pub fn set_window_update_mode(&mut self, m: WindowUpdateMode) -> &mut Self {
self.window_update_mode = m;
self
}

/// Allow or disallow streams to read from buffered data after
/// the connection has been closed.
pub fn set_read_after_close(&mut self, b: bool) -> &mut Self {
Expand Down

0 comments on commit a85b148

Please sign in to comment.