Skip to content

Commit

Permalink
Make Context::request return IntoFuture builder
Browse files Browse the repository at this point in the history
  • Loading branch information
n1ghtmare committed Feb 23, 2023
1 parent 4410ab5 commit 20b2c1a
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 17 deletions.
79 changes: 62 additions & 17 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use serde_json::{self, json};
use std::borrow::Borrow;
use std::future::IntoFuture;
use std::io::{self, ErrorKind};
use std::marker::PhantomData;
use std::pin::Pin;
use std::str::from_utf8;
use std::task::Poll;
Expand Down Expand Up @@ -691,26 +692,12 @@ impl Context {
/// # Ok(())
/// # }
/// ```
pub async fn request<T, V>(&self, subject: String, payload: &T) -> Result<Response<V>, Error>
pub fn request<T, V>(&self, subject: String, payload: T) -> Request<T, V>
where
T: ?Sized + Serialize,
T: Sized + Serialize,
V: DeserializeOwned,
{
let request = serde_json::to_vec(&payload).map(Bytes::from)?;

debug!("JetStream request sent: {:?}", request);

let message = self
.client
.request(format!("{}.{}", self.prefix, subject), request)
.await?;
debug!(
"JetStream request response: {:?}",
from_utf8(&message.payload)
);
let response = serde_json::from_slice(message.payload.as_ref())?;

Ok(response)
Request::new(self.clone(), subject, payload)
}

/// Creates a new object store bucket.
Expand Down Expand Up @@ -1131,3 +1118,61 @@ impl IntoFuture for Publish {
}))
}
}

#[derive(Debug)]
pub struct Request<T: Sized + Serialize, V: DeserializeOwned> {
context: Context,
subject: String,
payload: T,
timeout: Option<Duration>,
response_type: PhantomData<V>,
}

impl<T: Sized + Serialize, V: DeserializeOwned> Request<T, V> {
pub fn new(context: Context, subject: String, payload: T) -> Self {
Self {
context,
subject,
payload,
timeout: None,
response_type: PhantomData,
}
}

pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
}

impl<T: Sized + Serialize, V: DeserializeOwned> IntoFuture for Request<T, V> {
type Output = Result<Response<V>, Error>;

type IntoFuture = Pin<Box<dyn Future<Output = Result<Response<V>, Error>> + Send>>;

fn into_future(self) -> Self::IntoFuture {
let payload_result = serde_json::to_vec(&self.payload).map(Bytes::from);

let prefix = self.context.prefix;
let client = self.context.client;
let subject = self.subject;
let timeout = self.timeout;

Box::pin(std::future::IntoFuture::into_future(async move {
let payload = payload_result?;
debug!("JetStream request sent: {:?}", payload);

let request = client.request(format!("{}.{}", prefix, subject), payload);
let request = request.timeout(timeout);
let message = request.await?;

debug!(
"JetStream request response: {:?}",
from_utf8(&message.payload)
);
let response = serde_json::from_slice(message.payload.as_ref())?;

Ok(response)
}))
}
}
15 changes: 15 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,21 @@ mod jetstream {
assert!(matches!(response, Response::Err { .. }));
}

#[tokio::test]
async fn request_timeout() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

let response: Response<AccountInfo> = context
.request("INFO".to_string(), &())
.timeout(Duration::from_secs(1))
.await
.unwrap();

assert!(matches!(response, Response::Ok { .. }));
}

#[tokio::test]
async fn create_stream() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit 20b2c1a

Please sign in to comment.