Skip to content

Commit

Permalink
send RESET_STREAM frame when stream's write-side is shutdown
Browse files Browse the repository at this point in the history
This also ensures that the stream's send buffer is considered "complete"
when all previously buffered data is dropped, by marking all the dropped
data as acked, so that the stream can be collected properly.
  • Loading branch information
ghedo committed Feb 25, 2021
1 parent 37de4c8 commit 76598e4
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 20 deletions.
66 changes: 56 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3228,9 +3228,10 @@ impl Connection {
self.streams.mark_readable(stream_id, false);
},

// TODO: send RESET_STREAM
Shutdown::Write => {
stream.send.shutdown()?;
let final_size = stream.send.shutdown()?;

self.streams.mark_reset(stream_id, true, err, final_size);

// Once shutdown, the stream is guaranteed to be non-writable.
self.streams.mark_writable(stream_id, false);
Expand Down Expand Up @@ -6556,34 +6557,79 @@ mod tests {

#[test]
fn stream_shutdown_write() {
let mut buf = [0; 65535];

let mut pipe = testing::Pipe::default().unwrap();
assert_eq!(pipe.handshake(), Ok(()));

// Client sends some data.
assert_eq!(pipe.client.stream_send(4, b"hello, world", false), Ok(12));
assert_eq!(pipe.advance(), Ok(()));

let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);

let mut b = [0; 15];
pipe.server.stream_recv(4, &mut b).unwrap();
let mut r = pipe.server.writable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);

assert_eq!(pipe.client.stream_send(4, b"a", false), Ok(1));
assert_eq!(pipe.client.stream_shutdown(4, Shutdown::Write, 0), Ok(()));
assert_eq!(pipe.advance(), Ok(()));
assert_eq!(pipe.client.streams.len(), 1);
assert_eq!(pipe.server.streams.len(), 1);

let mut r = pipe.server.readable();
// Server sends some data.
assert_eq!(pipe.server.stream_send(4, b"goodbye, world", false), Ok(14));

// Server shuts down stream.
assert_eq!(pipe.server.stream_shutdown(4, Shutdown::Write, 42), Ok(()));

let mut r = pipe.server.writable();
assert_eq!(r.next(), None);

assert_eq!(pipe.client.stream_send(4, b"bye", false), Ok(3));
let len = pipe.server.send(&mut buf).unwrap();

let mut dummy = buf[..len].to_vec();

let frames =
testing::decode_pkt(&mut pipe.client, &mut dummy, len).unwrap();
let mut iter = frames.iter();

assert_eq!(
iter.next(),
Some(&frame::Frame::ResetStream {
stream_id: 4,
error_code: 42,
final_size: 14,
})
);

assert_eq!(pipe.client.recv(&mut buf[..len]), Ok(len));

assert_eq!(pipe.advance(), Ok(()));

// Sending more data is forbidden.
assert_eq!(
pipe.server.stream_send(4, b"bye", false),
Err(Error::FinalSize)
);

// Client sends some data and closes the stream.
assert_eq!(pipe.client.stream_send(4, b"bye", true), Ok(3));
assert_eq!(pipe.advance(), Ok(()));

// Server reads the data.
let mut r = pipe.server.readable();
assert_eq!(r.next(), Some(4));
assert_eq!(r.next(), None);

assert_eq!(pipe.server.stream_recv(4, &mut buf), Ok((15, true)));

// Stream is collected on both sides.
// TODO: assert_eq!(pipe.client.streams.len(), 0);
assert_eq!(pipe.server.streams.len(), 0);

assert_eq!(
pipe.client.stream_shutdown(4, Shutdown::Write, 0),
pipe.server.stream_shutdown(4, Shutdown::Write, 0),
Err(Error::Done)
);
}
Expand Down
15 changes: 5 additions & 10 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,12 +1015,6 @@ impl SendBuf {
pub fn write(&mut self, mut data: &[u8], mut fin: bool) -> Result<usize> {
let max_off = self.off + data.len() as u64;

if self.shutdown {
// Since we won't write any more data anyway, pretend that we sent
// all data that was passed in.
return Ok(data.len());
}

// Get the stream send capacity. This will return an error if the stream
// was stopped.
let capacity = self.cap()?;
Expand Down Expand Up @@ -1279,6 +1273,9 @@ impl SendBuf {
// Drop all buffered data.
self.data.clear();

// Mark all data as acked.
self.ack(0, self.off as usize);

self.pos = 0;
self.len = 0;

Expand All @@ -1301,16 +1298,14 @@ impl SendBuf {
}

/// Shuts down sending data.
pub fn shutdown(&mut self) -> Result<()> {
pub fn shutdown(&mut self) -> Result<u64> {
if self.shutdown {
return Err(Error::Done);
}

self.shutdown = true;

self.data.clear();

Ok(())
self.reset()
}

/// Returns the largest offset of data buffered.
Expand Down

0 comments on commit 76598e4

Please sign in to comment.