Skip to content

Commit

Permalink
simplify waiting for recovery
Browse files Browse the repository at this point in the history
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
  • Loading branch information
Keruspe committed Oct 12, 2024
1 parent 76bd423 commit 73ae509
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 39 deletions.
20 changes: 8 additions & 12 deletions examples/p.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use lapin::{
options::*, types::FieldTable, BasicProperties, ChannelState, Connection, ConnectionProperties,
Error,
options::*, types::FieldTable, BasicProperties, Connection, ConnectionProperties,
};
use tracing::info;

Expand Down Expand Up @@ -85,16 +84,13 @@ fn main() {
}
Err(err) => {
println!("GOT ERROR");
match err {
Error::InvalidChannelState(ChannelState::Reconnecting, Some(notifier)) => {
notifier.await
}
err => {
if !err.is_amqp_soft_error() {
panic!("{}", err);
}
errors += 1;
}
let (soft, notifier) = err.is_amqp_soft_error();
if !soft {
panic!("{}", err);
}
errors += 1;
if let Some(notifier) = notifier {
notifier.await
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/acker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,13 @@ impl Acker {

async fn rpc<F: Fn(&InternalRPCHandle, PromiseResolver<()>)>(&self, f: F) -> Result<()> {
if self.used.swap(true, Ordering::SeqCst) {
return Err(Error::ProtocolError(AMQPError::new(
AMQPSoftError::PRECONDITIONFAILED.into(),
"Attempted to use an already used Acker".into(),
)));
return Err(Error::ProtocolError(
AMQPError::new(
AMQPSoftError::PRECONDITIONFAILED.into(),
"Attempted to use an already used Acker".into(),
),
None,
));
}
if let Some(error) = self.error.as_ref() {
error.check()?;
Expand Down
18 changes: 9 additions & 9 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ impl Channel {
class_id,
method_id,
);
Err(Error::ProtocolError(error))
Err(Error::ProtocolError(error, None))
}
}

Expand Down Expand Up @@ -426,7 +426,7 @@ impl Channel {
class_id,
method_id,
);
Err(Error::ProtocolError(error))
Err(Error::ProtocolError(error, None))
}

pub(crate) fn handle_content_header_frame(
Expand Down Expand Up @@ -465,7 +465,7 @@ impl Channel {
class_id,
0,
);
let error = Error::ProtocolError(error);
let error = Error::ProtocolError(error, None);
self.set_connection_error(error.clone());
Err(error)
},
Expand Down Expand Up @@ -534,7 +534,7 @@ impl Channel {
)
.await
});
Err(Error::ProtocolError(err))
Err(Error::ProtocolError(err, None))
}

fn before_connection_start_ok(
Expand All @@ -553,7 +553,7 @@ impl Channel {
}

fn on_connection_close_ok_sent(&self, error: Error) {
if let Error::ProtocolError(_) = error {
if let Error::ProtocolError(_, _) = error {
self.internal_rpc.set_connection_error(error);
} else {
self.internal_rpc.set_connection_closed(error);
Expand All @@ -573,7 +573,7 @@ impl Channel {

fn on_channel_close_ok_sent(&self, error: Option<Error>) {
if !self.recovery_config.auto_recover_channels
|| !error.as_ref().map_or(false, Error::is_amqp_soft_error)
|| !error.as_ref().map_or(false, |e| e.is_amqp_soft_error().0)
{
self.set_closed(
error
Expand Down Expand Up @@ -822,7 +822,7 @@ impl Channel {
?error,
"Connection closed",
);
Error::ProtocolError(error)
Error::ProtocolError(error, None)
})
.unwrap_or_else(|error| {
error!(%error);
Expand Down Expand Up @@ -911,13 +911,13 @@ impl Channel {
channel=%self.id, ?method, ?error,
"Channel closed"
);
Error::ProtocolError(error)
Error::ProtocolError(error, None)
});
match (
self.recovery_config.auto_recover_channels,
error.clone().ok(),
) {
(true, Some(error)) if error.is_amqp_soft_error() => {
(true, Some(error)) if error.is_amqp_soft_error().0 => {
self.status.set_reconnecting(error)
}
(_, err) => self.set_closing(err),
Expand Down
5 changes: 3 additions & 2 deletions src/channel_recovery_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ pub(crate) struct ChannelRecoveryContext {

impl ChannelRecoveryContext {
pub(crate) fn new(cause: Error) -> Self {
let notifier = Notifier::default();
Self {
cause,
cause: cause.with_notifier(notifier.clone()),
expected_replies: None,
notifier: Notifier::default(),
notifier,
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl Channels {
.await
});
}
return Err(Error::ProtocolError(error));
return Err(Error::ProtocolError(error, None));
}
}
AMQPFrame::Header(channel_id, class_id, header) => {
Expand All @@ -248,7 +248,7 @@ impl Channels {
.await
});
}
return Err(Error::ProtocolError(error));
return Err(Error::ProtocolError(error, None));
} else {
self.handle_content_header_frame(
channel_id,
Expand Down
27 changes: 18 additions & 9 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub enum Error {

IOError(Arc<io::Error>),
ParsingError(ParserError),
ProtocolError(AMQPError),
ProtocolError(AMQPError, Option<Notifier>),
SerialisationError(Arc<GenError>),

MissingHeartbeatError,
Expand All @@ -53,23 +53,30 @@ impl Error {
}
}

pub fn is_amqp_soft_error(&self) -> bool {
if let Error::ProtocolError(e) = self {
pub fn is_amqp_soft_error(&self) -> (bool, Option<Notifier>) {
if let Error::ProtocolError(e, notifier) = self {
if let AMQPErrorKind::Soft(_) = e.kind() {
return true;
return (true, notifier.clone());
}
}
false
(false, None)
}

pub fn is_amqp_hard_error(&self) -> bool {
if let Error::ProtocolError(e) = self {
if let Error::ProtocolError(e, _) = self {
if let AMQPErrorKind::Hard(_) = e.kind() {
return true;
}
}
false
}

pub(crate) fn with_notifier(self, notifier: Notifier) -> Self {
match self {
Self::ProtocolError(err, _) => Self::ProtocolError(err, Some(notifier)),
err => err,
}
}
}

impl fmt::Display for Error {
Expand All @@ -91,7 +98,7 @@ impl fmt::Display for Error {

Error::IOError(e) => write!(f, "IO error: {}", e),
Error::ParsingError(e) => write!(f, "failed to parse: {}", e),
Error::ProtocolError(e) => write!(f, "protocol error: {}", e),
Error::ProtocolError(e, _) => write!(f, "protocol error: {}", e),
Error::SerialisationError(e) => write!(f, "failed to serialise: {}", e),

Error::MissingHeartbeatError => {
Expand Down Expand Up @@ -119,7 +126,7 @@ impl error::Error for Error {
match self {
Error::IOError(e) => Some(&**e),
Error::ParsingError(e) => Some(e),
Error::ProtocolError(e) => Some(e),
Error::ProtocolError(e, _) => Some(e),
Error::SerialisationError(e) => Some(&**e),
_ => None,
}
Expand Down Expand Up @@ -156,7 +163,9 @@ impl PartialEq for Error {
false
}
(ParsingError(left_inner), ParsingError(right_inner)) => left_inner == right_inner,
(ProtocolError(left_inner), ProtocolError(right_inner)) => left_inner == right_inner,
(ProtocolError(left_inner, _), ProtocolError(right_inner, _)) => {
left_inner == right_inner
}
(SerialisationError(_), SerialisationError(_)) => {
error!("Unable to compare lapin::Error::SerialisationError");
false
Expand Down
2 changes: 1 addition & 1 deletion src/io_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ impl IoLoop {
0,
0,
);
self.critical_error(Error::ProtocolError(error))?;
self.critical_error(Error::ProtocolError(error, None))?;
}
self.receive_buffer.consume(consumed);
Ok(Some(f))
Expand Down

0 comments on commit 73ae509

Please sign in to comment.