Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize jetstream::consumer::pull::Consumer::stream method. #529

Merged
merged 15 commits into from
Jun 27, 2022
3 changes: 1 addition & 2 deletions async-nats/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::str;
use std::str::FromStr;
use std::str::{self, FromStr};

use subslice::SubsliceExt;
use tokio::io::{AsyncRead, AsyncWriteExt};
Expand Down
133 changes: 125 additions & 8 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

use bytes::Bytes;
use futures::future::BoxFuture;
use futures::stream::{self, TryStreamExt};
use std::{task::Poll, time::Duration};

use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -53,19 +52,29 @@ impl Consumer<Config> {
/// ..Default::default()
/// }).await?;
///
/// let mut messages = consumer.stream()?.take(100);
/// let mut messages = consumer.stream().await?.take(100);
/// while let Some(Ok(message)) = messages.next().await {
/// println!("got message {:?}", message);
/// message.ack().await?;
/// }
/// Ok(())
/// # }
/// ```
pub fn stream(&self) -> Result<Stream, Error> {
let sequence = self.sequence(10)?;
let try_flatten = sequence.try_flatten();

Ok(try_flatten)
pub async fn stream(&self) -> Result<Stream<'_>, Error> {
Stream::stream(
BatchConfig {
batch: 100,
expires: Some(Duration::from_secs(30).as_nanos().try_into().unwrap()),
no_wait: false,
max_bytes: 0,
idle_heartbeat: Duration::default(),
},
self,
)
.await
}
pub async fn stream_with_config(&self, config: BatchConfig) -> Result<Stream<'_>, Error> {
Copy link
Collaborator

@caspervonb caspervonb Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we know we can easily run into issues with this, maybe we wanna hold off on the with_config variant til after this week's release?

Suggested change
pub async fn stream_with_config(&self, config: BatchConfig) -> Result<Stream<'_>, Error> {
pub async fn stream_with_config(&self, config: BatchConfig) -> Result<Stream<'_>, Error> {

Copy link
Member Author

@Jarema Jarema Jun 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@caspervonb I will follow up today with:

  1. make this private
  2. expose only batch, max bytes and heartbeats.

It's not released yet, so I would not bother with removing it, as it requires removing tests that will be needed the same day (pull with heartbeat).

Stream::stream(config, self).await
}

pub(crate) async fn request_batch<I: Into<BatchConfig>>(
Expand Down Expand Up @@ -246,7 +255,115 @@ impl<'a> futures::Stream for Sequence<'a> {
}
}

pub type Stream<'a> = stream::TryFlatten<Sequence<'a>>;
pub struct Stream<'a> {
pending_messages: usize,
subscriber: Subscriber,
context: Context,
inbox: String,
subject: String,
batch_config: BatchConfig,
request: Option<BoxFuture<'a, Result<(), Error>>>,
}

impl<'a> Stream<'a> {
async fn stream(
batch_config: BatchConfig,
consumer: &Consumer<Config>,
) -> Result<Stream<'a>, Error> {
let inbox = consumer.context.client.new_inbox();
let subscription = consumer.context.client.subscribe(inbox.clone()).await?;
let subject = format!(
"{}.CONSUMER.MSG.NEXT.{}.{}",
consumer.context.prefix, consumer.info.stream_name, consumer.info.name
);

Ok(Stream {
pending_messages: 0,
subscriber: subscription,
context: consumer.context.clone(),
request: None,
inbox,
subject,
batch_config,
})
}
}

impl<'a> futures::Stream for Stream<'a> {
type Item = Result<jetstream::Message, Error>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
loop {
match self.request.as_mut() {
None => {
let context = self.context.clone();
let inbox = self.inbox.clone();
let subject = self.subject.clone();

let next_request_threshold =
self.pending_messages < std::cmp::min(self.batch_config.batch / 2, 100);

if next_request_threshold {
let batch = self.batch_config;
self.pending_messages += batch.batch;
self.request = Some(Box::pin(async move {
let request = serde_json::to_vec(&batch).map(Bytes::from)?;

context
.client
.publish_with_reply(subject, inbox, request)
.await?;
Ok(())
}));
}

if let Some(request) = self.request.as_mut() {
match request.as_mut().poll(cx) {
Poll::Ready(result) => {
self.request = None;
result?;
}
Poll::Pending => {}
}
}
}

Some(request) => match request.as_mut().poll(cx) {
Poll::Ready(result) => {
self.request = None;
result?;
}
Poll::Pending => {}
},
}
match self.subscriber.receiver.poll_recv(cx) {
Poll::Ready(maybe_message) => match maybe_message {
Some(message) => match message.status.unwrap_or(StatusCode::OK) {
StatusCode::TIMEOUT => {
self.pending_messages = 0;
continue;
}
StatusCode::IDLE_HEARBEAT => {}
_ => {
self.pending_messages -= 1;
return Poll::Ready(Some(Ok(jetstream::Message {
context: self.context.clone(),
message,
})));
}
},
None => return Poll::Ready(None),
},
Poll::Pending => {
return std::task::Poll::Pending;
}
}
}
}
}

/// Used for next Pull Request for Pull Consumer
#[derive(Debug, Default, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
Expand Down
2 changes: 1 addition & 1 deletion async-nats/src/jetstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
//! ..Default::default()
//! }).await?;
//!
//! let mut messages = consumer.stream()?.take(100);
//! let mut messages = consumer.stream().await?.take(100);
//! while let Ok(Some(message)) = messages.try_next().await {
//! println!("message receiver: {:?}", message);
//! message.ack().await?;
Expand Down
131 changes: 123 additions & 8 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ mod jetstream {
}

#[tokio::test]
async fn pull_stream() {
async fn pull_stream_default() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);
Expand All @@ -417,19 +417,134 @@ mod jetstream {
.unwrap();
let consumer = stream.get_consumer("pull").await.unwrap();

for _ in 0..1000 {
context
.publish("events".to_string(), "dat".into())
.await
.unwrap();
tokio::task::spawn(async move {
for i in 0..1000 {
context
.publish("events".to_string(), format!("i: {}", i).into())
.await
.unwrap();
}
});

let mut iter = consumer.stream().await.unwrap().take(1000);
while let Some(result) = iter.next().await {
result.unwrap().ack().await.unwrap();
}
}

let mut iter = consumer.stream().unwrap().take(1000);
#[tokio::test]
// Test ignored until Server issue around sending Pull Request immediately after getting
// 408 timeout is resolved.
#[ignore]
async fn pull_stream_with_timeout() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

context
.create_stream(stream::Config {
name: "events".to_string(),
subjects: vec!["events".to_string()],
..Default::default()
})
.await
.unwrap();

let stream = context.get_stream("events").await.unwrap();
stream
.create_consumer(&Config {
durable_name: Some("pull".to_string()),
..Default::default()
})
.await
.unwrap();
let consumer = stream.get_consumer("pull").await.unwrap();

tokio::task::spawn(async move {
for i in 0..100 {
tokio::time::sleep(Duration::from_millis(50)).await;
let ack = context
.publish(
"events".to_string(),
format!("timeout test message: {}", i).into(),
)
.await
.unwrap();
println!("ack from publish {}: {:?}", i, ack);
}
println!("send all 100 messages to jetstream");
});

let mut iter = consumer
.stream_with_config(consumer::pull::BatchConfig {
batch: 25,
expires: Some(Duration::from_millis(100).as_nanos().try_into().unwrap()),
no_wait: false,
..Default::default()
})
.await
.unwrap()
.take(100);
while let Some(result) = iter.next().await {
assert!(result.is_ok());
println!("MESSAGE: {:?}", result);
result.unwrap().ack().await.unwrap();
}
}

#[tokio::test]
async fn pull_stream_with_hearbeat() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

context
.create_stream(stream::Config {
name: "events".to_string(),
subjects: vec!["events".to_string()],
..Default::default()
})
.await
.unwrap();

let stream = context.get_stream("events").await.unwrap();
stream
.create_consumer(&Config {
durable_name: Some("pull".to_string()),
..Default::default()
})
.await
.unwrap();
let consumer = stream.get_consumer("pull").await.unwrap();

tokio::task::spawn(async move {
for i in 0..100 {
tokio::time::sleep(Duration::from_millis(10)).await;
context
.publish(
"events".to_string(),
format!("hearbeat message: {}", i).into(),
)
.await
.unwrap();
}
});

let mut iter = consumer
.stream_with_config(consumer::pull::BatchConfig {
batch: 25,
expires: Some(Duration::from_millis(5000).as_nanos().try_into().unwrap()),
no_wait: false,
max_bytes: 0,
idle_heartbeat: Duration::from_millis(10),
})
.await
.unwrap()
.take(100);
while let Some(result) = iter.next().await {
println!("MESSAGE: {:?}", result);
result.unwrap().ack().await.unwrap();
}
}
#[tokio::test]
async fn pull_fetch() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down
2 changes: 2 additions & 0 deletions nats-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ pub fn run_server_with_port(cfg: &str, port: Option<&str>) -> Server {
if !cfg.is_empty() {
cmd.arg("-c").arg(cfg);
}
cmd.arg("--debug");
cmd.arg("--trace");

let child = cmd.spawn().unwrap();

Expand Down