Skip to content

Commit

Permalink
lib: remove calls to .flush()
Browse files Browse the repository at this point in the history
  • Loading branch information
paolobarbolini committed Jul 30, 2023
1 parent 11376a2 commit 712ef12
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 13 deletions.
3 changes: 0 additions & 3 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,6 @@ impl Client {
}
None => self.publish_with_reply(subject, inbox, payload).await?,
}
self.flush()
.await
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, sub.next())
Expand Down
8 changes: 0 additions & 8 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ impl Consumer<Config> {
.publish_with_reply(subject, inbox, payload.into())
.await
.map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Publish, err))?;
self.context
.client
.flush()
.await
.map_err(|err| BatchRequestError::with_source(BatchRequestErrorKind::Flush, err))?;
debug!("batch request sent");
Ok(())
}
Expand Down Expand Up @@ -924,9 +919,6 @@ impl Stream {
.publish_with_reply(subject.clone(), inbox.clone(), request.clone())
.await
.map(|_| pending_reset);
if let Err(err) = consumer.context.client.flush().await {
debug!("flush failed: {err:?}");
}
// TODO: add tracing instead of ignoring this.
request_result_tx
.send(result.map(|_| pending_reset).map_err(|err| {
Expand Down
1 change: 0 additions & 1 deletion async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,6 @@ impl<'a> futures::Stream for Ordered<'a> {
.publish(subject, Bytes::from_static(b""))
.await
.ok();
client.flush().await.ok();
});
}
continue;
Expand Down
3 changes: 2 additions & 1 deletion async-nats/tests/kv_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ mod kv {
.await
.unwrap();

let context = async_nats::jetstream::new(client);
let context = async_nats::jetstream::new(client.clone());

let kv = context
.create_key_value(async_nats::jetstream::kv::Config {
Expand All @@ -352,6 +352,7 @@ mod kv {
// check if we get only updated values. This should not pop up in watcher.
kv.put("foo", 22.to_string().into()).await.unwrap();
let mut watch = kv.watch("foo").await.unwrap().enumerate();
client.flush().await.unwrap();

tokio::task::spawn({
let kv = kv.clone();
Expand Down

0 comments on commit 712ef12

Please sign in to comment.