Skip to content

Commit

Permalink
drop frames from old channel when recovering
Browse files Browse the repository at this point in the history
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
  • Loading branch information
Keruspe committed Oct 12, 2024
1 parent 5851856 commit bf645c5
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ impl Channel {
if self.recovery_config.auto_recover_channels {
self.status.update_recovery_context(|ctx| {
ctx.set_expected_replies(self.frames.take_expected_replies(self.id));
self.frames.drop_frames_for_channel(channel.id, ctx.cause());
self.acknowledgements.reset(ctx.cause());
self.consumers.error(ctx.cause());
});
Expand Down
37 changes: 35 additions & 2 deletions src/frames.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Frames {

pub(crate) fn next_expected_close_ok_reply(
&self,
channel_id: u16,
channel_id: ChannelId,
error: Error,
) -> Option<Reply> {
self.inner
Expand Down Expand Up @@ -104,6 +104,10 @@ impl Frames {
Inner::cancel_expected_replies(replies, error)
}

pub(crate) fn drop_frames_for_channel(&self, channel_id: ChannelId, error: Error) {
self.inner.lock().drop_frames_for_channel(channel_id, error)
}

pub(crate) fn poison(&self) -> Option<Error> {
self.inner.lock().poison.clone()
}
Expand Down Expand Up @@ -266,7 +270,36 @@ impl Inner {
}
}

fn next_expected_close_ok_reply(&mut self, channel_id: u16, error: Error) -> Option<Reply> {
fn drop_frames_for_channel(&mut self, channel_id: ChannelId, error: Error) {
Self::drop_pending_frames_for_channel(channel_id, &mut self.retry_frames, error.clone());
Self::drop_pending_frames_for_channel(channel_id, &mut self.publish_frames, error.clone());
Self::drop_pending_frames_for_channel(channel_id, &mut self.frames, error.clone());
Self::drop_pending_frames_for_channel(channel_id, &mut self.low_prio_frames, error);
}

fn drop_pending_frames_for_channel(
channel_id: ChannelId,
frames: &mut VecDeque<(AMQPFrame, Option<PromiseResolver<()>>)>,
error: Error,
) {
use AMQPFrame::*;

frames.retain(|(f, r)| match f {
Method(id, _) | Header(id, _, _) | Body(id, _) | Heartbeat(id) if *id == channel_id => {
if let Some(r) = r {
r.reject(error.clone());
}
false
}
_ => true,
})
}

fn next_expected_close_ok_reply(
&mut self,
channel_id: ChannelId,
error: Error,
) -> Option<Reply> {
let expected_replies = self.expected_replies.get_mut(&channel_id)?;
while let Some(reply) = expected_replies.pop_front() {
match &reply.0 {
Expand Down

0 comments on commit bf645c5

Please sign in to comment.