From f28ce237e74453ee0502006721e027b85e36dcfb Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Fri, 16 Aug 2024 16:09:29 +0200 Subject: [PATCH] feat: implement `Sink` for `Client` Signed-off-by: Roman Volosatovs --- async-nats/src/client.rs | 137 +++++++++------------------------------ async-nats/src/lib.rs | 20 +++--- 2 files changed, 44 insertions(+), 113 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index da946d840..713cdbe18 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -15,8 +15,8 @@ use core::pin::Pin; use core::task::{Context, Poll}; use crate::connection::State; -use crate::subject::{Subject, ToSubject}; -use crate::ServerInfo; +use crate::subject::ToSubject; +use crate::{PublishMessage, ServerInfo}; use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber}; use crate::error::Error; @@ -40,7 +40,6 @@ static VERSION_RE: Lazy = /// An error returned from the [`Client::publish`], [`Client::publish_with_headers`], /// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions. -/// This error is also used as [`Publisher::Error`] pub type PublishError = Error; impl From> for PublishError { @@ -70,61 +69,44 @@ impl Display for PublishErrorKind { } } -/// [`Sink`] created by [`Client::publish_sink`], [`Client::publish_with_reply_sink`], -/// [`Client::publish_with_headers_sink`] or [`Client::publish_with_reply_and_headers_sink`] +/// Client is a `Cloneable` handle to NATS connection. +/// Client should not be created directly. Instead, one of two methods can be used: +/// [crate::connect] and [crate::ConnectOptions::connect] #[derive(Clone, Debug)] -pub struct Publisher { - subject: Subject, - respond: Option, - headers: Option, - sender: PollSender, +pub struct Client { + info: tokio::sync::watch::Receiver, + pub(crate) state: tokio::sync::watch::Receiver, + pub(crate) sender: mpsc::Sender, + poll_sender: PollSender, + next_subscription_id: Arc, + subscription_capacity: usize, + inbox_prefix: Arc, + request_timeout: Option, + max_payload: Arc, } -impl Sink for Publisher { +impl Sink for Client { type Error = PublishError; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.sender.poll_ready_unpin(cx).map_err(Into::into) + self.poll_sender.poll_ready_unpin(cx).map_err(Into::into) } - fn start_send(mut self: Pin<&mut Self>, payload: Bytes) -> Result<(), Self::Error> { - let headers = self.headers.clone(); - let respond = self.respond.clone(); - let subject = self.subject.clone(); - self.sender - .start_send_unpin(Command::Publish { - subject, - payload, - respond, - headers, - }) + fn start_send(mut self: Pin<&mut Self>, msg: PublishMessage) -> Result<(), Self::Error> { + self.poll_sender + .start_send_unpin(Command::Publish(msg)) .map_err(Into::into) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.sender.poll_flush_unpin(cx).map_err(Into::into) + self.poll_sender.poll_flush_unpin(cx).map_err(Into::into) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.sender.poll_close_unpin(cx).map_err(Into::into) + self.poll_sender.poll_close_unpin(cx).map_err(Into::into) } } -/// Client is a `Cloneable` handle to NATS connection. -/// Client should not be created directly. Instead, one of two methods can be used: -/// [crate::connect] and [crate::ConnectOptions::connect] -#[derive(Clone, Debug)] -pub struct Client { - info: tokio::sync::watch::Receiver, - pub(crate) state: tokio::sync::watch::Receiver, - pub(crate) sender: mpsc::Sender, - next_subscription_id: Arc, - subscription_capacity: usize, - inbox_prefix: Arc, - request_timeout: Option, - max_payload: Arc, -} - impl Client { pub(crate) fn new( info: tokio::sync::watch::Receiver, @@ -135,10 +117,12 @@ impl Client { request_timeout: Option, max_payload: Arc, ) -> Client { + let poll_sender = PollSender::new(sender.clone()); Client { info, state, sender, + poll_sender, next_subscription_id: Arc::new(AtomicU64::new(1)), subscription_capacity: capacity, inbox_prefix: inbox_prefix.into(), @@ -242,26 +226,16 @@ impl Client { } self.sender - .send(Command::Publish { + .send(Command::Publish(PublishMessage { subject, payload, respond: None, headers: None, - }) + })) .await?; Ok(()) } - /// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject. - pub fn publish_sink(&self, subject: S) -> Publisher { - Publisher { - subject: subject.to_subject(), - respond: None, - headers: None, - sender: PollSender::new(self.sender.clone()), - } - } - /// Publish a [Message] with headers to a given subject. /// /// # Examples @@ -290,30 +264,16 @@ impl Client { let subject = subject.to_subject(); self.sender - .send(Command::Publish { + .send(Command::Publish(PublishMessage { subject, payload, respond: None, headers: Some(headers), - }) + })) .await?; Ok(()) } - /// Returns a [Publisher], which can be used to send multiple [Messages](Message) with headers to a given subject. - pub fn publish_with_headers_sink( - &self, - subject: S, - headers: HeaderMap, - ) -> Publisher { - Publisher { - subject: subject.to_subject(), - respond: None, - headers: Some(headers), - sender: PollSender::new(self.sender.clone()), - } - } - /// Publish a [Message] to a given subject, with specified response subject /// to which the subscriber can respond. /// This method does not await for the response. @@ -340,32 +300,16 @@ impl Client { let reply = reply.to_subject(); self.sender - .send(Command::Publish { + .send(Command::Publish(PublishMessage { subject, payload, respond: Some(reply), headers: None, - }) + })) .await?; Ok(()) } - /// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject, - /// with specified response subject to which the subscriber can respond. - /// [Publisher] does not await for the response. - pub fn publish_with_reply_sink( - &self, - subject: S, - reply: R, - ) -> Publisher { - Publisher { - subject: subject.to_subject(), - respond: Some(reply.to_subject()), - headers: None, - sender: PollSender::new(self.sender.clone()), - } - } - /// Publish a [Message] to a given subject with headers and specified response subject /// to which the subscriber can respond. /// This method does not await for the response. @@ -395,33 +339,16 @@ impl Client { let reply = reply.to_subject(); self.sender - .send(Command::Publish { + .send(Command::Publish(PublishMessage { subject, payload, respond: Some(reply), headers: Some(headers), - }) + })) .await?; Ok(()) } - /// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject, - /// with headers and specified response subject to which the subscriber can respond. - /// [Publisher] does not await for the response. - pub fn publish_with_reply_and_headers_sink( - &self, - subject: S, - reply: R, - headers: HeaderMap, - ) -> Publisher { - Publisher { - subject: subject.to_subject(), - respond: Some(reply.to_subject()), - headers: Some(headers), - sender: PollSender::new(self.sender.clone()), - } - } - /// Sends the request with headers. /// /// # Examples diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 003721e0a..62b53f2f3 100755 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -342,15 +342,19 @@ pub(crate) enum ServerOp { }, } +/// `PublishMessage` represents a message being published +#[derive(Debug)] +pub struct PublishMessage { + pub subject: Subject, + pub payload: Bytes, + pub respond: Option, + pub headers: Option, +} + /// `Command` represents all commands that a [`Client`] can handle #[derive(Debug)] pub(crate) enum Command { - Publish { - subject: Subject, - payload: Bytes, - respond: Option, - headers: Option, - }, + Publish(PublishMessage), Request { subject: Subject, payload: Bytes, @@ -823,12 +827,12 @@ impl ConnectionHandler { self.connection.enqueue_write_op(&pub_op); } - Command::Publish { + Command::Publish(PublishMessage { subject, payload, respond, headers, - } => { + }) => { self.connection.enqueue_write_op(&ClientOp::Publish { subject, payload,