Skip to content

Commit

Permalink
fix issue#648: drop frame if stream is released
Browse files Browse the repository at this point in the history
  • Loading branch information
p00512853 committed Dec 1, 2022
1 parent 294000c commit 47d6127
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
10 changes: 10 additions & 0 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,16 @@ impl Recv {
}
}

// Received a frame, but no one cared about it. fix issue#648
if !stream.is_recv {
tracing::trace!(
"recv_data; frame ignored on stream release {:?} for some time",
stream.id,
);
self.release_connection_capacity(sz, &mut None);
return Ok(());
}

// Update stream level flow control
stream.recv_flow.send_data(sz);

Expand Down
4 changes: 4 additions & 0 deletions src/proto/streams/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ pub(super) struct Stream {
/// Frames pending for this stream to read
pub pending_recv: buffer::Deque,

/// When the RecvStream drop occurs, no data should be received.
pub is_recv: bool,

/// Task tracking receiving frames
pub recv_task: Option<Waker>,

Expand Down Expand Up @@ -180,6 +183,7 @@ impl Stream {
reset_at: None,
next_reset_expire: None,
pending_recv: buffer::Deque::new(),
is_recv: true,
recv_task: None,
pending_push_promises: store::Queue::new(),
content_length: ContentLength::Omitted,
Expand Down
12 changes: 11 additions & 1 deletion src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1345,15 +1345,25 @@ impl OpaqueStreamRef {
.release_capacity(capacity, &mut stream, &mut me.actions.task)
}

/// Clear the receive queue and set the status to no longer receive data frames.
pub(crate) fn clear_recv_buffer(&mut self) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let mut stream = me.store.resolve(self.key);

stream.is_recv = false;
me.actions.recv.clear_recv_buffer(&mut stream);
}

/// When there is a data receiving party, set this parameter to the receiving state.
pub(crate) fn enable_recv(&self) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let mut stream = me.store.resolve(self.key);
stream.is_recv = true;
}

pub fn stream_id(&self) -> StreamId {
self.inner.lock().unwrap().store[self.key].id
}
Expand Down
1 change: 1 addition & 0 deletions src/share.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ impl StreamId {

impl RecvStream {
pub(crate) fn new(inner: FlowControl) -> Self {
inner.inner.enable_recv();
RecvStream { inner }
}

Expand Down

0 comments on commit 47d6127

Please sign in to comment.