From 20b2c1a6431c70472cafee2fb2e850ddcc2563b2 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Wed, 22 Feb 2023 18:16:23 +0200 Subject: [PATCH] Make `Context::request` return `IntoFuture` builder --- async-nats/src/jetstream/context.rs | 79 ++++++++++++++++++++++------- async-nats/tests/jetstream_tests.rs | 15 ++++++ 2 files changed, 77 insertions(+), 17 deletions(-) diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index 5387f4fa0..432e7d7cf 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -26,6 +26,7 @@ use serde_json::{self, json}; use std::borrow::Borrow; use std::future::IntoFuture; use std::io::{self, ErrorKind}; +use std::marker::PhantomData; use std::pin::Pin; use std::str::from_utf8; use std::task::Poll; @@ -691,26 +692,12 @@ impl Context { /// # Ok(()) /// # } /// ``` - pub async fn request(&self, subject: String, payload: &T) -> Result, Error> + pub fn request(&self, subject: String, payload: T) -> Request where - T: ?Sized + Serialize, + T: Sized + Serialize, V: DeserializeOwned, { - let request = serde_json::to_vec(&payload).map(Bytes::from)?; - - debug!("JetStream request sent: {:?}", request); - - let message = self - .client - .request(format!("{}.{}", self.prefix, subject), request) - .await?; - debug!( - "JetStream request response: {:?}", - from_utf8(&message.payload) - ); - let response = serde_json::from_slice(message.payload.as_ref())?; - - Ok(response) + Request::new(self.clone(), subject, payload) } /// Creates a new object store bucket. @@ -1131,3 +1118,61 @@ impl IntoFuture for Publish { })) } } + +#[derive(Debug)] +pub struct Request { + context: Context, + subject: String, + payload: T, + timeout: Option, + response_type: PhantomData, +} + +impl Request { + pub fn new(context: Context, subject: String, payload: T) -> Self { + Self { + context, + subject, + payload, + timeout: None, + response_type: PhantomData, + } + } + + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + self + } +} + +impl IntoFuture for Request { + type Output = Result, Error>; + + type IntoFuture = Pin, Error>> + Send>>; + + fn into_future(self) -> Self::IntoFuture { + let payload_result = serde_json::to_vec(&self.payload).map(Bytes::from); + + let prefix = self.context.prefix; + let client = self.context.client; + let subject = self.subject; + let timeout = self.timeout; + + Box::pin(std::future::IntoFuture::into_future(async move { + let payload = payload_result?; + debug!("JetStream request sent: {:?}", payload); + + let request = client.request(format!("{}.{}", prefix, subject), payload); + let request = request.timeout(timeout); + let message = request.await?; + + debug!( + "JetStream request response: {:?}", + from_utf8(&message.payload) + ); + let response = serde_json::from_slice(message.payload.as_ref())?; + + Ok(response) + })) + } +} diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index 4ff047d4b..ec4ee696d 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -310,6 +310,21 @@ mod jetstream { assert!(matches!(response, Response::Err { .. })); } + #[tokio::test] + async fn request_timeout() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + let context = async_nats::jetstream::new(client); + + let response: Response = context + .request("INFO".to_string(), &()) + .timeout(Duration::from_secs(1)) + .await + .unwrap(); + + assert!(matches!(response, Response::Ok { .. })); + } + #[tokio::test] async fn create_stream() { let server = nats_server::run_server("tests/configs/jetstream.conf");