Skip to content

Commit

Permalink
fix(s2n-quic-transport): optimize STOP_SENDING state (#1217)
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft authored Mar 11, 2022
1 parent 3a4a025 commit eb0223b
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 37 deletions.
5 changes: 4 additions & 1 deletion quic/s2n-quic-transport/src/stream/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,10 @@ pub mod rx {

if self.stop_sending.is_some() {
let response = response.expect("stop_sending should never fail");
assert!(response.is_reset(), "stop_sending should reset the stream");
assert!(
response.is_reset() || response.is_finished(),
"stop_sending should reset or finish the stream"
);

assert_eq!(
response.bytes,
Expand Down
114 changes: 107 additions & 7 deletions quic/s2n-quic-transport/src/stream/receive_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
};
use core::{
convert::TryFrom,
task::{Context, Waker},
task::{Context, Poll, Waker},
};
use s2n_quic_core::{
ack, application,
Expand Down Expand Up @@ -61,7 +61,7 @@ use s2n_quic_core::{

/// Enumerates the possible states of the receiving side of a stream.
/// These states are equivalent to the ones in the QUIC transport specification.
#[derive(PartialEq, Debug, Copy, Clone)]
#[derive(PartialEq, Debug, Clone)]
pub(super) enum ReceiveStreamState {
/// The stream is still receiving data from the remote peer. This state
/// coverst the `Recv`, `Size Known` and `Data Recvd` state from the QUIC
Expand All @@ -73,11 +73,62 @@ pub(super) enum ReceiveStreamState {
/// All data had been received from the peer and consumed by the user.
/// This is the terminal state.
DataRead,
/// The application has requested the peer to STOP_SENDING and the stream is currently
/// waiting for an ACK for the STOP_SENDING frame.
Stopping {
error: StreamError,
missing_data: MissingData,
},
/// The connection was reset. The flag indicates whether the reset status
/// had already been observed by the user.
Reset(StreamError),
}

/// Keeps track of any missing data in the `Stopping` state
#[derive(PartialEq, Debug, Clone)]
pub(super) struct MissingData {
start: u64,
end: u64,
}

impl MissingData {
fn new(start: u64) -> Self {
Self {
start,
end: u64::MAX,
}
}

fn on_data(&mut self, frame: &StreamRef) -> Poll<()> {
// We could track if we have any pending gaps and continue to send STOP_SENDING but
// that would require keeping the receive buffer around, which isn't really useful
// since the application has already closed the stream.
//
// Instead, we just use a simple range

let frame_start = *frame.offset;
let frame_end = *(frame.offset + frame.data.len());
let frame_range = frame_start..frame_end;

// update the start if it overlaps the offset of the frame
if frame_range.contains(&self.start) {
self.start = frame_end;
}

// update the end if this is the last frame or if it contains the current end
if frame.is_fin || frame_range.contains(&self.end) {
self.end = self.end.min(frame_start);
}

// return if we've received everything
if self.start >= self.end {
Poll::Ready(())
} else {
Poll::Pending
}
}
}

/// Writes the `MAX_STREAM_DATA` frames based on the streams flow control window.
#[derive(Debug, Default)]
pub(super) struct MaxStreamDataToFrameWriter {}
Expand Down Expand Up @@ -347,6 +398,15 @@ impl ReceiveStream {
// into the end-of-stream signal. We could add these checks, but
// the main outcome would be to send connection errors.
}
ReceiveStreamState::Stopping {
ref mut missing_data,
..
} => {
if missing_data.on_data(frame).is_ready() {
self.stop_sending_sync.stop_sync();
self.final_state_observed = true;
}
}
ReceiveStreamState::DataRead => {
// We also ignore the data in this case. We could validate whether
// it actually fitted into previously announced window, but
Expand Down Expand Up @@ -558,6 +618,11 @@ impl ReceiveStream {
// data had been already received.
match self.state {
ReceiveStreamState::Reset(_) | ReceiveStreamState::DataRead => return Ok(()),
ReceiveStreamState::Stopping { .. } => {
// Prefer the error from the peer instead of the STOP_SENDING error.
self.state = ReceiveStreamState::Reset(error);
return Ok(());
}
ReceiveStreamState::Receiving(Some(total_size)) => {
if let Some(actual_size) = actual_size {
// If the stream size which is indicated through the reset
Expand Down Expand Up @@ -629,7 +694,8 @@ impl ReceiveStream {
pub fn on_packet_ack<A: ack::Set>(&mut self, ack_set: &A) {
self.flow_controller.read_window_sync.on_packet_ack(ack_set);

self.stop_sending_sync.on_packet_ack(ack_set);
// finalize the stream if the peer has ACKed the STOP_SENDING frame
self.final_state_observed |= self.stop_sending_sync.on_packet_ack(ack_set).is_ready();
}

/// This method gets called when a packet loss is reported
Expand Down Expand Up @@ -670,6 +736,8 @@ impl ReceiveStream {
let mut response = ops::rx::Response::default();

if let Some(error_code) = request.stop_sending {
let error = StreamError::stream_reset(error_code);

match self.state {
//= https://www.rfc-editor.org/rfc/rfc9000#section-3.3
//# A receiver MAY send a STOP_SENDING frame in any state where it has
Expand All @@ -679,12 +747,41 @@ impl ReceiveStream {
//= https://www.rfc-editor.org/rfc/rfc9000#section-3.5
//# STOP_SENDING SHOULD only be sent for a stream that has not been reset
//# by the peer.
ReceiveStreamState::Reset(_) => (),
ReceiveStreamState::Reset(error) | ReceiveStreamState::Stopping { error, .. } => {
response.status = ops::Status::Reset(error);
return Ok(response);
}
// If we've already read everything, transition to the final state
ReceiveStreamState::DataRead => {
self.state = ReceiveStreamState::DataRead;
self.final_state_observed = true;
response.status = ops::Status::Finished;
return Ok(response);
}
// If we've already buffered everything, transition to the final state
ReceiveStreamState::Receiving(Some(total_size))
if self.receive_buffer.total_received_len() == total_size =>
{
self.state = ReceiveStreamState::DataRead;
self.final_state_observed = true;
response.status = ops::Status::Finished;
return Ok(response);
}
//= https://www.rfc-editor.org/rfc/rfc9000#section-3.5
//# If the stream is in the "Recv" or "Size Known" states, the transport
//# SHOULD signal this by sending a STOP_SENDING frame to prompt closure
//# of the stream in the opposite direction.
_ => self.stop_sending_sync.request_delivery(error_code),
_ => {
self.stop_sending_sync.request_delivery(error_code);

let received_len = self.receive_buffer.total_received_len();
let missing_data = MissingData::new(received_len);
// transition to the Stopping state so we can start shutting down
self.state = ReceiveStreamState::Stopping {
error,
missing_data,
};
}
}

self.read_waiter = None;
Expand All @@ -693,9 +790,12 @@ impl ReceiveStream {
// space which had been allocated but not used
self.receive_buffer.reset();

// we don't need any more flow control
self.flow_controller.release_outstanding_window();

// Mark the stream as reset. Note that the request doesn't have a flush so there's
// currently no way to wait for the reset to be acknowledged.
response.status = ops::Status::Reset(StreamError::stream_reset(error_code));
response.status = ops::Status::Reset(error);

return Ok(response);
}
Expand All @@ -704,7 +804,7 @@ impl ReceiveStream {
// allowed to read (not reset).

let total_size = match self.state {
ReceiveStreamState::Reset(error) => {
ReceiveStreamState::Reset(error) | ReceiveStreamState::Stopping { error, .. } => {
// The reset is now known to have been read by the client.
self.final_state_observed = true;
self.read_waiter = None;
Expand Down
4 changes: 1 addition & 3 deletions quic/s2n-quic-transport/src/stream/send_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,10 +626,8 @@ impl SendStream {
}
}

self.reset_sync.on_packet_ack(ack_set);

if let SendStreamState::ResetSent(error_code) = self.state {
if self.reset_sync.is_delivered() {
if self.reset_sync.on_packet_ack(ack_set).is_ready() {
// A reset had been acknowledged. Enter the terminal state.
self.state = SendStreamState::ResetAcknowledged(error_code);

Expand Down
27 changes: 16 additions & 11 deletions quic/s2n-quic-transport/src/stream/tests/receive_stream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1812,9 +1812,9 @@ fn stop_sending_will_trigger_a_stop_sending_frame() {
let mut events = StreamEvents::new();
test_env.stream.on_packet_ack(&packet_nr, &mut events);

// Nothing new to write
// Nothing new to write; the stream should be finished
assert_eq!(
stream_interests(&[]),
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
test_env.assert_write_frames(0);
Expand Down Expand Up @@ -1868,8 +1868,10 @@ fn do_not_retransmit_stop_sending_if_requested_twice() {
test_env
.stream
.on_packet_ack(&sent_frame.packet_nr, &mut events);

// Finalize the stream after ACKing the STOP_SENDING frame
assert_eq!(
stream_interests(&[]),
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
}
Expand All @@ -1881,7 +1883,7 @@ fn do_not_retransmit_stop_sending_if_requested_twice() {
// Nothing new to write
if *ack_packet {
assert_eq!(
stream_interests(&[]),
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
} else {
Expand Down Expand Up @@ -1977,7 +1979,7 @@ fn stop_sending_is_ignored_if_stream_has_already_received_all_data() {
.stop_sending(ApplicationErrorCode::new(0x1234_5678).unwrap())
.is_ok());
assert_eq!(
stream_interests(&[]),
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);

Expand Down Expand Up @@ -2037,14 +2039,14 @@ fn stop_sending_can_be_sent_if_size_is_known_but_data_is_still_missing() {

// Now we should not require stop sending anymore
assert_eq!(
stream_interests(&[]),
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
}

test_env.ack_packet(sent_frame.packet_nr, ExpectWakeup(Some(false)));
assert_eq!(
stream_interests(&[]),
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);

Expand Down Expand Up @@ -2079,7 +2081,7 @@ fn stop_sending_is_aborted_if_stream_receives_all_data() {
.is_ok());

assert_eq!(
stream_interests(&[]),
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
test_env.assert_write_frames(0);
Expand Down Expand Up @@ -2127,7 +2129,7 @@ fn stop_sending_is_aborted_if_stream_receives_all_data_with_data_after_fin() {
.is_ok());

assert_eq!(
stream_interests(&[]),
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
test_env.assert_write_frames(0);
Expand Down Expand Up @@ -2164,7 +2166,10 @@ fn stop_sending_is_ignored_if_stream_has_received_or_consumed_all_data() {
assert!(test_env
.stop_sending(ApplicationErrorCode::new(0x1234_5678).unwrap())
.is_ok());
assert_eq!(expected_interests, test_env.stream.get_stream_interests());
assert_eq!(
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);

test_env.assert_write_frames(0);
}
Expand Down Expand Up @@ -2238,7 +2243,7 @@ fn stop_sending_frames_are_retransmitted_on_loss() {
// Acknowledge the second packet and we are done
test_env.ack_packet(packet_nr_2, ExpectWakeup(Some(false)));
assert_eq!(
stream_interests(&[]),
stream_interests(&["fin"]),
test_env.stream.get_stream_interests()
);
test_env.assert_write_frames(0);
Expand Down
7 changes: 0 additions & 7 deletions quic/s2n-quic-transport/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,6 @@ impl<T> DeliveryState<T> {
matches!(self, Self::InFlight(_))
}

/// Returns `true` if the payload had been delivered to the peer and had
/// been acknowledged by the peer.
#[inline]
pub fn is_delivered(&self) -> bool {
matches!(self, Self::Delivered(_))
}

/// Tries to transmit the delivery with the given transmission constraint
#[inline]
pub fn try_transmit(&self, constraint: transmission::Constraint) -> Option<&T> {
Expand Down
13 changes: 5 additions & 8 deletions quic/s2n-quic-transport/src/sync/once_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
sync::{DeliveryState, InFlightDelivery, InflightPacketInfo, ValueToFrameWriter},
transmission,
};
use core::task::Poll;
use s2n_quic_core::{ack, stream::StreamId};

/// Synchronizes a value of type `T` exactly once towards the remote peer.
Expand Down Expand Up @@ -38,13 +39,6 @@ impl<T: Copy + Clone + Eq + PartialEq, S: ValueToFrameWriter<T>> OnceSync<T, S>
Self::default()
}

/// Returns `true` if the payload had been delivered to the peer and had
/// been acknowledged by the peer.
#[inline]
pub fn is_delivered(&self) -> bool {
self.delivery.is_delivered()
}

/// Returns `true` if the delivery is current in progress.
/// A packet has been sent, but no acknowledgement has been retrieved so far.
#[inline]
Expand Down Expand Up @@ -72,14 +66,17 @@ impl<T: Copy + Clone + Eq + PartialEq, S: ValueToFrameWriter<T>> OnceSync<T, S>
}

/// This method gets called when a packet delivery got acknowledged
pub fn on_packet_ack<A: ack::Set>(&mut self, ack_set: &A) {
pub fn on_packet_ack<A: ack::Set>(&mut self, ack_set: &A) -> Poll<()> {
// If the packet containing the frame gets acknowledged, mark the delivery as
// succeeded.
if let DeliveryState::InFlight(in_flight) = self.delivery {
if ack_set.contains(in_flight.packet.packet_nr) {
self.delivery = DeliveryState::Delivered(in_flight.value);
return Poll::Ready(());
}
}

Poll::Pending
}

/// This method gets called when a packet loss is reported
Expand Down

0 comments on commit eb0223b

Please sign in to comment.