diff --git a/Cargo.toml b/Cargo.toml index 52c5813d..f75a47a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ waker-fn = "^1.1" [dev-dependencies] async-global-executor = "^2.0" +async-io = "^2.0" futures-lite = "^2.0" serde_json = "^1.0" waker-fn = "^1.1" diff --git a/examples/c.rs b/examples/c.rs new file mode 100644 index 00000000..e870c6e7 --- /dev/null +++ b/examples/c.rs @@ -0,0 +1,58 @@ +use futures_lite::StreamExt; +use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties}; +use tracing::info; + +fn main() { + if std::env::var("RUST_LOG").is_err() { + unsafe { std::env::set_var("RUST_LOG", "info") }; + } + + tracing_subscriber::fmt::init(); + + let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); + + async_global_executor::block_on(async { + let conn = Connection::connect(&addr, ConnectionProperties::default()) + .await + .expect("connection error"); + + info!("CONNECTED"); + + //receive channel + let channel = conn.create_channel().await.expect("create_channel"); + info!(state=?conn.status().state()); + + let queue = channel + .queue_declare( + "hello-recover", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .expect("queue_declare"); + info!(state=?conn.status().state()); + info!(?queue, "Declared queue"); + + info!("will consume"); + let mut consumer = channel + .basic_consume( + "hello-recover", + "my_consumer", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await + .expect("basic_consume"); + info!(state=?conn.status().state()); + + while let Some(delivery) = consumer.next().await { + info!(message=?delivery, "received message"); + if let Ok(delivery) = delivery { + delivery + .ack(BasicAckOptions::default()) + .await + .expect("basic_ack"); + } + } + }) +} diff --git a/examples/p.rs b/examples/p.rs new file mode 100644 index 00000000..2bde3f4d --- /dev/null +++ b/examples/p.rs @@ -0,0 +1,104 @@ +use lapin::{ + options::*, types::FieldTable, BasicProperties, ChannelState, Connection, ConnectionProperties, + Error, +}; +use tracing::info; + +fn main() { + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", "info"); + } + + tracing_subscriber::fmt::init(); + + let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into()); + let recovery_config = lapin::experimental::RecoveryConfig { + auto_recover_channels: true, + }; + + async_global_executor::block_on(async { + let conn = Connection::connect( + &addr, + ConnectionProperties::default().with_experimental_recovery_config(recovery_config), + ) + .await + .expect("connection error"); + + info!("CONNECTED"); + + let channel1 = conn.create_channel().await.expect("create_channel"); + channel1 + .confirm_select(ConfirmSelectOptions::default()) + .await + .expect("confirm_select"); + channel1 + .queue_declare( + "hello-recover", + QueueDeclareOptions::default(), + FieldTable::default(), + ) + .await + .expect("queue_declare"); + + let ch = channel1.clone(); + async_global_executor::spawn(async move { + loop { + async_io::Timer::after(std::time::Duration::from_secs(1)).await; + info!("Trigger failure"); + assert!(ch + .queue_declare( + "fake queue", + QueueDeclareOptions { + passive: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await + .is_err()); + } + }) + .detach(); + + let mut published = 0; + let mut errors = 0; + info!("will publish"); + loop { + let res = channel1 + .basic_publish( + "", + "recover-test", + BasicPublishOptions::default(), + b"before", + BasicProperties::default(), + ) + .await; + let res = if let Ok(res) = res { + res.await.map(|_| ()) + } else { + res.map(|_| ()) + }; + match res { + Ok(()) => { + println!("GOT OK"); + published += 1; + } + Err(err) => { + println!("GOT ERROR"); + match err { + Error::InvalidChannelState(ChannelState::Reconnecting, Some(notifier)) => { + notifier.await + } + err => { + if !err.is_amqp_soft_error() { + panic!("{}", err); + } + errors += 1; + } + } + } + } + println!("Published {} with {} errors", published, errors); + } + }); +} diff --git a/src/id_sequence.rs b/src/id_sequence.rs index 89294b55..d8d069d3 100644 --- a/src/id_sequence.rs +++ b/src/id_sequence.rs @@ -29,7 +29,7 @@ impl< // self.id is actually the next (so that first call to next returns 0 // if we're 0 (or 1 if 0 is not allowed), either we haven't started yet, or last number we yielded (current one) is // the max. - if self.id == self.first() { + if self.id <= self.first() { self.max } else { Some(self.id - self.one)