Skip to content

Commit

Permalink
set state to reconnecting earlier
Browse files Browse the repository at this point in the history
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
  • Loading branch information
Keruspe committed Aug 31, 2024
1 parent 21c782a commit 6d309ea
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 17 deletions.
34 changes: 18 additions & 16 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,28 +565,25 @@ impl Channel {
}

fn next_expected_close_ok_reply(&self) -> Option<Reply> {
self.frames
.next_expected_close_ok_reply(self.id, Error::InvalidChannelState(ChannelState::Closed, None))
self.frames.next_expected_close_ok_reply(
self.id,
Error::InvalidChannelState(ChannelState::Closed, None),
)
}

fn before_channel_close(&self) {
self.set_closing(None);
}

fn on_channel_close_ok_sent(&self, error: Option<Error>) {
match (self.recovery_config.auto_recover_channels, error) {
(true, Some(error)) if error.is_amqp_soft_error() => {
self.status.set_reconnecting(error)
}
(_, error) => {
self.set_closed(
error
.clone()
.unwrap_or(Error::InvalidChannelState(ChannelState::Closing, None)),
);
if let Some(error) = error {
self.error_handler.on_error(error);
}
if !self.recovery_config.auto_recover_channels || !error.as_ref().map_or(false, Error::is_amqp_soft_error) {
self.set_closed(
error
.clone()
.unwrap_or(Error::InvalidChannelState(ChannelState::Closing, None)),
);
if let Some(error) = error {
self.error_handler.on_error(error);
}
}
}
Expand Down Expand Up @@ -916,7 +913,12 @@ impl Channel {
);
Error::ProtocolError(error)
});
self.set_closing(error.clone().ok());
match (self.recovery_config.auto_recover_channels, error.clone().ok()) {
(true, Some(error)) if error.is_amqp_soft_error() => {
self.status.set_reconnecting(error)
}
(_, err) => self.set_closing(err),
}
let error = error.map_err(|error| info!(channel=%self.id, ?method, code_to_error=%error, "Channel closed with a non-error code")).ok();
let channel = self.clone();
self.internal_rpc.register_internal_future(async move {
Expand Down
6 changes: 5 additions & 1 deletion src/channel_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@ impl ChannelStatus {
}

pub fn closing(&self) -> bool {
self.0.lock().state == ChannelState::Closing
[ChannelState::Closing, ChannelState::Reconnecting].contains(&self.0.lock().state)
}

pub fn connected(&self) -> bool {
self.0.lock().state == ChannelState::Connected
}

pub fn reconnecting(&self) -> bool {
self.0.lock().state == ChannelState::Reconnecting
}

pub(crate) fn connected_or_recovering(&self) -> bool {
[ChannelState::Connected, ChannelState::Reconnecting].contains(&self.0.lock().state)
}
Expand Down

0 comments on commit 6d309ea

Please sign in to comment.