diff --git a/src/channel.rs b/src/channel.rs index b7516573..92880f61 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -868,6 +868,7 @@ impl Channel { if self.recovery_config.auto_recover_channels { self.status.update_recovery_context(|ctx| { ctx.set_expected_replies(self.frames.take_expected_replies(self.id)); + self.frames.drop_frames_for_channel(channel.id, ctx.cause()); self.acknowledgements.reset(ctx.cause()); self.consumers.error(ctx.cause()); }); diff --git a/src/frames.rs b/src/frames.rs index b6122131..2472f43c 100644 --- a/src/frames.rs +++ b/src/frames.rs @@ -71,7 +71,7 @@ impl Frames { pub(crate) fn next_expected_close_ok_reply( &self, - channel_id: u16, + channel_id: ChannelId, error: Error, ) -> Option { self.inner @@ -104,6 +104,10 @@ impl Frames { Inner::cancel_expected_replies(replies, error) } + pub(crate) fn drop_frames_for_channel(&self, channel_id: ChannelId, error: Error) { + self.inner.lock().drop_frames_for_channel(channel_id, error) + } + pub(crate) fn poison(&self) -> Option { self.inner.lock().poison.clone() } @@ -266,7 +270,36 @@ impl Inner { } } - fn next_expected_close_ok_reply(&mut self, channel_id: u16, error: Error) -> Option { + fn drop_frames_for_channel(&mut self, channel_id: ChannelId, error: Error) { + Self::drop_pending_frames_for_channel(channel_id, &mut self.retry_frames, error.clone()); + Self::drop_pending_frames_for_channel(channel_id, &mut self.publish_frames, error.clone()); + Self::drop_pending_frames_for_channel(channel_id, &mut self.frames, error.clone()); + Self::drop_pending_frames_for_channel(channel_id, &mut self.low_prio_frames, error); + } + + fn drop_pending_frames_for_channel( + channel_id: ChannelId, + frames: &mut VecDeque<(AMQPFrame, Option>)>, + error: Error, + ) { + use AMQPFrame::*; + + frames.retain(|(f, r)| match f { + Method(id, _) | Header(id, _, _) | Body(id, _) | Heartbeat(id) if *id == channel_id => { + if let Some(r) = r { + r.reject(error.clone()); + } + false + } + _ => true, + }) + } + + fn next_expected_close_ok_reply( + &mut self, + channel_id: ChannelId, + error: Error, + ) -> Option { let expected_replies = self.expected_replies.get_mut(&channel_id)?; while let Some(reply) = expected_replies.pop_front() { match &reply.0 {