diff --git a/async-nats/src/jetstream/consumer/pull.rs b/async-nats/src/jetstream/consumer/pull.rs index fb7869f9a..8e71bea6c 100644 --- a/async-nats/src/jetstream/consumer/pull.rs +++ b/async-nats/src/jetstream/consumer/pull.rs @@ -385,7 +385,7 @@ impl futures::Stream for Batch { Poll::Ready(maybe_message) => match maybe_message { Some(message) => match message.status.unwrap_or(StatusCode::OK) { StatusCode::TIMEOUT => { - debug!("received timeout. Iterator done."); + debug!("received timeout. Iterator done"); self.terminated = true; Poll::Ready(None) } @@ -393,6 +393,14 @@ impl futures::Stream for Batch { debug!("received heartbeat"); Poll::Pending } + // If this is fetch variant, terminate on no more messages. + // We do not need to check if this is a fetch, not batch, + // as only fetch will send back `NO_MESSAGES` status. + StatusCode::NOT_FOUND => { + debug!("received `NO_MESSAGES`. Iterator done"); + self.terminated = true; + Poll::Ready(None) + } StatusCode::OK => { debug!("received message"); self.pending_messages -= 1; diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 1e9c54ed6..8caae54f2 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -1052,6 +1052,55 @@ mod jetstream { let consumer = stream.get_consumer("pull").await.unwrap(); consumer.fetch().max_messages(10).messages().await.unwrap(); } + + #[tokio::test] + async fn fetch() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(&server.client_url()).await.unwrap(); + let context = async_nats::jetstream::new(client); + + let stream = context + .create_stream(jetstream::stream::Config { + name: "events".into(), + subjects: vec!["events".into()], + ..Default::default() + }) + .await + .unwrap(); + + for _ in 0..20 { + context.publish("events", "data".into()).await.unwrap(); + } + + let consumer = stream + .create_consumer(consumer::pull::Config { + durable_name: Some("pull".into()), + ..Default::default() + }) + .await + .unwrap(); + + let messages = consumer + .fetch() + .max_messages(15) + .messages() + .await + .unwrap() + .count() + .await; + assert_eq!(messages, 15); + + let messages = consumer + .fetch() + .max_messages(15) + .messages() + .await + .unwrap() + .count() + .await; + assert_eq!(messages, 5); + } + #[tokio::test] async fn get_consumer_from_stream() { let server = nats_server::run_server("tests/configs/jetstream.conf");