Skip to content

Commit

Permalink
Adding some client tests, updating doc-comments, fixing bug where I f…
Browse files Browse the repository at this point in the history
…orgot to set handler.is_draining
  • Loading branch information
jsudano authored and jsudano committed Nov 14, 2024
1 parent 149161d commit bcc995b
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 11 deletions.
20 changes: 18 additions & 2 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,10 +632,26 @@ impl Client {
}

/// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
/// messages, then closes the connection
/// messages, then closes the connection. Once completed, any associated streams associated with the
/// client will be closed, and further client commands will fail
///
/// # Examples
/// TODO
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("demo.nats.io").await?;
/// let mut subscription = client.subscribe("events.>").await?;
///
/// client.drain().await?;
///
/// # // existing subscriptions are closed and further commands will fail
/// assert!(subscription.next().await.is_none());
/// client.subscribe().await.expect_err("Expected further commands to fail");
///
/// # Ok(())
/// # }
/// ```
pub async fn drain(&self) -> Result<(), DrainError> {
// Drain all subscriptions
self.sender.send(Command::Drain { sid: None }).await?;
Expand Down
21 changes: 12 additions & 9 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,15 +539,16 @@ impl ConnectionHandler {
}

// Before handling any commands, drop any subscriptions which are draining
// Note: safe to assume drain has completed, as we would have flushed all outgoing
// UNSUB messages in the previous call to this fn, and we would have processed and delivered
// any remaining messages to the subscription in the loop above.
// Note: safe to assume subscription drain has completed at this point, as we would have flushed
// all outgoing UNSUB messages in the previous call to this fn, and we would have processed and
// delivered any remaining messages to the subscription in the loop above.
self.handler.subscriptions.retain(|_, s| !s.is_draining);

if self.handler.is_draining {
// The entire connection is draining. This means we flushed outgoing messages and all subs
// were drained by the above retain and we should exit instead of processing any further
// messages
// The entire connection is draining. This means we flushed outgoing messages in the previous
// call to this fn, we handled any remaining messages from the server in the loop above, and
// all subs were drained, so drain is complete and we should exit instead of processing any
// further messages
return Poll::Ready(ExitReason::Closed);
}

Expand Down Expand Up @@ -806,6 +807,8 @@ impl ConnectionHandler {
drain_sub(&sid, sub);
}
} else {
// sid isn't set, so drain the whole client
self.is_draining = true;
for (sid, sub) in self.subscriptions.iter_mut() {
drain_sub(sid, sub);
}
Expand Down Expand Up @@ -1291,9 +1294,9 @@ impl Subscriber {
Ok(())
}

/// Unsubscribes from subscription immediately leaves the stream open for the configured drain period
/// to allow any in-flight messages on the subscription to be delivered. The stream will be closed
/// at the end of the drain period
/// Unsubscribes immediately but leaves the stream open to allow any in-flight messages on the
/// subscription to be delivered. The stream will be closed after any remaining messages are
/// delivered
///
/// # Examples
/// ```
Expand Down
138 changes: 138 additions & 0 deletions async-nats/tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1042,4 +1042,142 @@ mod client {

assert_eq!(client.timeout(), None);
}

#[tokio::test]
async fn drain_subscription_basic() {
use std::error::Error;
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let mut sub = client.subscribe("test").await.unwrap();

// publish some data
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();

// confirm we receive that data
assert!(sub.next().await.is_some());

// now drain the subscription
let result = sub.drain().await;
match result {
Ok(()) => println!("ok"),
Err(err) => {
println!("error: {}", err);
println!("source: {:?}", err.source())
}
}

// assert the stream is closed after draining
assert!(sub.next().await.is_none());

// confirm we can still reconnect and send messages on a new subscription
let mut sub2 = client.subscribe("test2").await.unwrap();
client.publish("test2", "data".into()).await.unwrap();
client.flush().await.unwrap();
assert!(sub2.next().await.is_some());
}

#[tokio::test]
async fn drain_subscription_unsub_after() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let mut sub = client.subscribe("test").await.unwrap();

sub.unsubscribe_after(120)
.await
.expect("Expected to send unsub_after");

// publish some data
client.publish("test", "data".into()).await.unwrap();
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();

// Send the drain command
sub.drain().await.expect("Expected to drain the sub");

// we should receive all published data then close immediately
assert!(sub.next().await.is_some());
assert!(sub.next().await.is_some());
assert!(sub.next().await.is_none());
}

#[tokio::test]
async fn drain_subscription_active() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

// spawn a task to constantly write to the subscription
let constant_writer = tokio::spawn({
let client = client.clone();
async move {
loop {
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();
}
}
});

let mut sub = client.subscribe("test").await.unwrap();

// confirm we receive some data
assert!(sub.next().await.is_some());

// now drain the subscription
sub.drain().await.unwrap();

// yield to the runtime to ensure constant_writer gets a chance to publish a message or two to the subject
tokio::time::sleep(Duration::from_millis(1)).await;

// assert the subscription stream is closed after draining
let sleep_fut = async move { while let Some(_) = sub.next().await {} };
tokio::time::timeout(Duration::from_secs(10), sleep_fut)
.await
.expect("Expected stream to drain within 10s");

// assert constant_writer doesn't fail to write after the only sub is drained (i.e. client operations still work fine)
assert!(!constant_writer.is_finished());

// confirm we can still reconnect and receive messages on the same subject on a new subscription
let mut sub2 = client.subscribe("test").await.unwrap();
assert!(sub2.next().await.is_some());
}

#[tokio::test]
async fn drain_client_basic() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let mut sub = client.subscribe("test").await.unwrap();

// publish some data
client.publish("test", "data".into()).await.unwrap();
client.flush().await.unwrap();

// confirm we receive that data
assert!(sub.next().await.is_some());

// now drain the client
client.drain().await.unwrap();

// assert the sub's stream is closed after draining
assert!(sub.next().await.is_none());

// we should not be able to perform any more operations on a drained client
client
.subscribe("test2")
.await
.expect_err("Expected client to be drained");

client
.publish("test", "data".into())
.await
.expect_err("Expected client to be drained");

// we should be able to connect with a new client
let _client2 = async_nats::connect(server.client_url())
.await
.expect("Expected to be able to create a new client");
}
}

0 comments on commit bcc995b

Please sign in to comment.