Skip to content

Commit

Permalink
Make Context::publish return IntoFuture builder
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervonb authored Feb 21, 2023
1 parent e1a30bc commit 4410ab5
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 169 deletions.
115 changes: 50 additions & 65 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,8 @@ impl Context {
/// # Ok(())
/// # }
/// ```
pub async fn publish(
&self,
subject: String,
payload: Bytes,
) -> Result<PublishAckFuture, Error> {
self.send_publish(subject, Publish::build().payload(payload))
.await
pub fn publish(&self, subject: String, payload: Bytes) -> Publish {
Publish::new(self.clone(), subject, payload)
}

/// Publish a message with headers to a given subject associated with a stream and returns an acknowledgment from
Expand Down Expand Up @@ -149,62 +144,9 @@ impl Context {
headers: crate::header::HeaderMap,
payload: Bytes,
) -> Result<PublishAckFuture, Error> {
self.send_publish(subject, Publish::build().payload(payload).headers(headers))
.await
}

/// Publish a message built by [Publish] and returns an acknowledgment future.
///
/// If the stream does not exist, `no responders` error will be returned.
///
/// # Examples
///
/// ```no_run
/// # use async_nats::jetstream::context::Publish;
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let ack =
/// jetstream.send_publish("events".to_string(),
/// Publish::build().payload("data".into()).message_id("uuid")
/// ).await?;
/// # Ok(())
/// # }
/// ```
pub async fn send_publish(
&self,
subject: String,
publish: Publish,
) -> Result<PublishAckFuture, Error> {
let inbox = self.client.new_inbox();
let response = self.client.subscribe(inbox.clone()).await?;
tokio::time::timeout(self.timeout, async {
if let Some(headers) = publish.headers {
self.client
.publish_with_reply_and_headers(
subject,
inbox.clone(),
headers,
publish.payload,
)
.await
} else {
self.client
.publish_with_reply(subject, inbox.clone(), publish.payload)
.await
}
})
.map_err(|_| {
std::io::Error::new(ErrorKind::TimedOut, "JetStream publish request timed out")
})
.await??;
let ack_future = self.publish(subject, payload).headers(headers).await?;

Ok(PublishAckFuture {
timeout: self.timeout,
subscription: response,
})
Ok(ack_future)
}

/// Query the server for account information
Expand Down Expand Up @@ -1081,15 +1023,23 @@ impl futures::Stream for Streams {
}
}
/// Used for building customized `publish` message.
#[derive(Default, Clone, Debug)]
#[derive(Clone, Debug)]
pub struct Publish {
context: Context,
subject: String,
payload: Bytes,
headers: Option<header::HeaderMap>,
}

impl Publish {
/// Creates a new custom Publish struct to be used with.
pub fn build() -> Self {
Default::default()
pub(crate) fn new(context: Context, subject: String, payload: Bytes) -> Self {
Publish {
context,
subject,
payload,
headers: None,
}
}

/// Sets the payload for the message.
Expand All @@ -1102,6 +1052,7 @@ impl Publish {
self.headers = Some(headers);
self
}

/// A shorthand to add a single header.
pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
self.headers
Expand Down Expand Up @@ -1146,3 +1097,37 @@ impl Publish {
)
}
}

impl IntoFuture for Publish {
type Output = Result<PublishAckFuture, Error>;
type IntoFuture = Pin<Box<dyn Future<Output = Result<PublishAckFuture, Error>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(std::future::IntoFuture::into_future(async move {
let inbox = self.context.client.new_inbox();
let subscription = self.context.client.subscribe(inbox.clone()).await?;
let mut publish = self
.context
.client
.publish(self.subject, self.payload)
.reply(inbox);

if let Some(headers) = self.headers {
publish = publish.headers(headers);
}

let timeout = self.context.timeout;

tokio::time::timeout(timeout, publish.into_future())
.map_err(|_| {
std::io::Error::new(ErrorKind::TimedOut, "JetStream publish request timed out")
})
.await??;

Ok(PublishAckFuture {
timeout,
subscription,
})
}))
}
}
Loading

0 comments on commit 4410ab5

Please sign in to comment.