Skip to content

Commit

Permalink
Improve consumer Stream implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Sep 20, 2023
1 parent 98fd93a commit 9b63d5e
Show file tree
Hide file tree
Showing 2 changed files with 242 additions and 287 deletions.
252 changes: 117 additions & 135 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,57 +443,48 @@ impl<'a> futures::Stream for Sequence<'a> {
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.next.as_mut() {
None => {
let context = self.context.clone();
let subject = self.subject.clone();
let request = self.request.clone();
let pending_messages = self.pending_messages;
let this = self.as_mut().get_mut();

let next = this.next.get_or_insert_with(|| {
let context = this.context.clone();
let subject = this.subject.clone();
let request = this.request.clone();
let pending_messages = this.pending_messages;

Box::pin(async move {
let inbox = context.client.new_inbox();
let subscriber = context
.client
.subscribe(inbox.clone())
.await
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;

context
.client
.publish_with_reply(subject, inbox, request)
.await
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;

// TODO(tp): Add timeout config and defaults.
Ok(Batch {
pending_messages,
subscriber,
context,
terminated: false,
timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
})
})
});

let next = self.next.insert(Box::pin(async move {
let inbox = context.client.new_inbox();
let subscriber = context
.client
.subscribe(inbox.clone())
.await
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;
match next.as_mut().poll(cx) {
Poll::Ready(result) => {
self.next = None;

context
.client
.publish_with_reply(subject, inbox, request)
.await
.map_err(|err| MessagesError::with_source(MessagesErrorKind::Pull, err))?;

// TODO(tp): Add timeout config and defaults.
Ok(Batch {
pending_messages,
subscriber,
context,
terminated: false,
timeout: Some(Box::pin(tokio::time::sleep(Duration::from_secs(60)))),
})
}));

match next.as_mut().poll(cx) {
Poll::Ready(result) => {
self.next = None;
Poll::Ready(Some(result.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Pull, err)
})))
}
Poll::Pending => Poll::Pending,
}
Poll::Ready(Some(result.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Pull, err)
})))
}

Some(next) => match next.as_mut().poll(cx) {
Poll::Ready(result) => {
self.next = None;
Poll::Ready(Some(result.map_err(|err| {
MessagesError::with_source(MessagesErrorKind::Pull, err)
})))
}
Poll::Pending => Poll::Pending,
},
Poll::Pending => Poll::Pending,
}
}
}
Expand Down Expand Up @@ -752,34 +743,32 @@ impl<'a> futures::Stream for Ordered<'a> {
// Poll messages
if let Some(stream) = self.stream.as_mut() {
match stream.poll_next_unpin(cx) {
Poll::Ready(message) => match message {
Some(message) => {
// Do we bail out on all errors?
// Or we want to handle some? (like consumer deleted?)
let message = message?;
let info = message.info().map_err(|err| {
OrderedError::with_source(OrderedErrorKind::Other, err)
})?;
trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
Poll::Ready(Some(message)) => {
// Do we bail out on all errors?
// Or we want to handle some? (like consumer deleted?)
let message = message?;
let info = message
.info()
.map_err(|err| OrderedError::with_source(OrderedErrorKind::Other, err))?;
trace!("consumer sequence: {:?}, stream sequence {:?}, consumer sequence in message: {:?} stream sequence in message: {:?}",
self.consumer_sequence,
self.stream_sequence,
info.consumer_sequence,
info.stream_sequence);
if info.consumer_sequence != self.consumer_sequence + 1 {
debug!(
"ordered consumer mismatch. current {}, info: {}",
self.consumer_sequence, info.consumer_sequence
);
recreate = true;
self.consumer_sequence = 0;
} else {
self.stream_sequence = info.stream_sequence;
self.consumer_sequence = info.consumer_sequence;
return Poll::Ready(Some(Ok(message)));
}
if info.consumer_sequence != self.consumer_sequence + 1 {
debug!(
"ordered consumer mismatch. current {}, info: {}",
self.consumer_sequence, info.consumer_sequence
);
recreate = true;
self.consumer_sequence = 0;
} else {
self.stream_sequence = info.stream_sequence;
self.consumer_sequence = info.consumer_sequence;
return Poll::Ready(Some(Ok(message)));
}
None => return Poll::Ready(None),
},
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => (),
}
}
Expand All @@ -793,30 +782,35 @@ impl<'a> futures::Stream for Ordered<'a> {
let consumer_name = self.consumer_name.clone();
let sequence = self.consumer_sequence;
async move {
recreate_consumer_stream(context, config, stream_name, consumer_name, sequence)
.await
recreate_consumer_stream(
context,
config,
&stream_name,
&consumer_name,
sequence,
)
.await
}
}))
}
// check for recreation future
if let Some(result) = self.create_stream.as_mut() {
match result.poll_unpin(cx) {
Poll::Ready(result) => match result {
Ok(stream) => {
self.create_stream = None;
self.stream = Some(stream);
return self.poll_next(cx);
}
Err(err) => {
return Poll::Ready(Some(Err(OrderedError::with_source(
OrderedErrorKind::Recreate,
err,
))))
}
},
Poll::Ready(Ok(stream)) => {
self.create_stream = None;
self.stream = Some(stream);
return self.poll_next(cx);
}
Poll::Ready(Err(err)) => {
return Poll::Ready(Some(Err(OrderedError::with_source(
OrderedErrorKind::Recreate,
err,
))))
}
Poll::Pending => (),
}
}

Poll::Pending
}
}
Expand Down Expand Up @@ -1034,18 +1028,15 @@ impl futures::Stream for Stream {
if !self.batch_config.idle_heartbeat.is_zero() {
trace!("checking idle hearbeats");
let timeout = self.batch_config.idle_heartbeat.saturating_mul(2);
match self
let heartbeat_timeout = self
.heartbeat_timeout
.get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)))
.poll_unpin(cx)
{
Poll::Ready(_) => {
self.heartbeat_timeout = None;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::MissingHeartbeat,
))));
}
Poll::Pending => (),
.get_or_insert_with(|| Box::pin(tokio::time::sleep(timeout)));

if heartbeat_timeout.poll_unpin(cx).is_ready() {
self.heartbeat_timeout = None;
return Poll::Ready(Some(Err(MessagesError::new(
MessagesErrorKind::MissingHeartbeat,
))));
}
}

Expand All @@ -1062,30 +1053,26 @@ impl futures::Stream for Stream {
}

match self.request_result_rx.poll_recv(cx) {
Poll::Ready(resp) => match resp {
Some(resp) => match resp {
Ok(reset) => {
trace!("request response: {:?}", reset);
debug!("request sent, setting pending messages");
if reset {
self.pending_messages = self.batch_config.batch;
self.pending_bytes = self.batch_config.max_bytes;
} else {
self.pending_messages += self.batch_config.batch;
self.pending_bytes += self.batch_config.max_bytes;
}
self.pending_request = false;
continue;
}
Err(err) => {
return Poll::Ready(Some(Err(MessagesError::with_source(
MessagesErrorKind::Pull,
err,
))))
}
},
None => return Poll::Ready(None),
},
Poll::Ready(Some(Ok(reset))) => {
trace!("request response: {:?}", reset);
debug!("request sent, setting pending messages");
if reset {
self.pending_messages = self.batch_config.batch;
self.pending_bytes = self.batch_config.max_bytes;
} else {
self.pending_messages += self.batch_config.batch;
self.pending_bytes += self.batch_config.max_bytes;
}
self.pending_request = false;
continue;
}
Poll::Ready(Some(Err(err))) => {
return Poll::Ready(Some(Err(MessagesError::with_source(
MessagesErrorKind::Pull,
err,
))))
}
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => {
trace!("pending result");
}
Expand All @@ -1095,6 +1082,7 @@ impl futures::Stream for Stream {
match self.subscriber.receiver.poll_recv(cx) {
Poll::Ready(maybe_message) => {
self.heartbeat_timeout = None;

match maybe_message {
Some(message) => match message.status.unwrap_or(StatusCode::OK) {
StatusCode::TIMEOUT | StatusCode::REQUEST_TERMINATED => {
Expand Down Expand Up @@ -2207,23 +2195,17 @@ pub type ConsumerRecreateError = Error<ConsumerRecreateErrorKind>;
async fn recreate_consumer_stream(
context: Context,
config: Config,
stream_name: String,
consumer_name: String,
stream_name: &str,
consumer_name: &str,
sequence: u64,
) -> Result<Stream, ConsumerRecreateError> {
// TODO(jarema): retry whole operation few times?
let stream = context
.get_stream(stream_name.clone())
.await
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
})?;
stream
.delete_consumer(&consumer_name)
.await
.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err)
})?;
let stream = context.get_stream(stream_name).await.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::GetStream, err)
})?;
stream.delete_consumer(consumer_name).await.map_err(|err| {
ConsumerRecreateError::with_source(ConsumerRecreateErrorKind::Recreate, err)
})?;

let deliver_policy = {
if sequence == 0 {
Expand Down
Loading

0 comments on commit 9b63d5e

Please sign in to comment.