Skip to content

Commit

Permalink
feat: implement Sink<PublishMessage> for Client
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
  • Loading branch information
rvolosatovs committed Aug 16, 2024
1 parent 23fffd8 commit f28ce23
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 113 deletions.
137 changes: 32 additions & 105 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use core::pin::Pin;
use core::task::{Context, Poll};

use crate::connection::State;
use crate::subject::{Subject, ToSubject};
use crate::ServerInfo;
use crate::subject::ToSubject;
use crate::{PublishMessage, ServerInfo};

use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber};
use crate::error::Error;
Expand All @@ -40,7 +40,6 @@ static VERSION_RE: Lazy<Regex> =

/// An error returned from the [`Client::publish`], [`Client::publish_with_headers`],
/// [`Client::publish_with_reply`] or [`Client::publish_with_reply_and_headers`] functions.
/// This error is also used as [`Publisher::Error`]
pub type PublishError = Error<PublishErrorKind>;

impl From<tokio::sync::mpsc::error::SendError<Command>> for PublishError {
Expand Down Expand Up @@ -70,61 +69,44 @@ impl Display for PublishErrorKind {
}
}

/// [`Sink<Bytes>`] created by [`Client::publish_sink`], [`Client::publish_with_reply_sink`],
/// [`Client::publish_with_headers_sink`] or [`Client::publish_with_reply_and_headers_sink`]
/// Client is a `Cloneable` handle to NATS connection.
/// Client should not be created directly. Instead, one of two methods can be used:
/// [crate::connect] and [crate::ConnectOptions::connect]
#[derive(Clone, Debug)]
pub struct Publisher {
subject: Subject,
respond: Option<Subject>,
headers: Option<HeaderMap>,
sender: PollSender<Command>,
pub struct Client {
info: tokio::sync::watch::Receiver<ServerInfo>,
pub(crate) state: tokio::sync::watch::Receiver<State>,
pub(crate) sender: mpsc::Sender<Command>,
poll_sender: PollSender<Command>,
next_subscription_id: Arc<AtomicU64>,
subscription_capacity: usize,
inbox_prefix: Arc<str>,
request_timeout: Option<Duration>,
max_payload: Arc<AtomicUsize>,
}

impl Sink<Bytes> for Publisher {
impl Sink<PublishMessage> for Client {
type Error = PublishError;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.sender.poll_ready_unpin(cx).map_err(Into::into)
self.poll_sender.poll_ready_unpin(cx).map_err(Into::into)
}

fn start_send(mut self: Pin<&mut Self>, payload: Bytes) -> Result<(), Self::Error> {
let headers = self.headers.clone();
let respond = self.respond.clone();
let subject = self.subject.clone();
self.sender
.start_send_unpin(Command::Publish {
subject,
payload,
respond,
headers,
})
fn start_send(mut self: Pin<&mut Self>, msg: PublishMessage) -> Result<(), Self::Error> {
self.poll_sender
.start_send_unpin(Command::Publish(msg))
.map_err(Into::into)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.sender.poll_flush_unpin(cx).map_err(Into::into)
self.poll_sender.poll_flush_unpin(cx).map_err(Into::into)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.sender.poll_close_unpin(cx).map_err(Into::into)
self.poll_sender.poll_close_unpin(cx).map_err(Into::into)
}
}

/// Client is a `Cloneable` handle to NATS connection.
/// Client should not be created directly. Instead, one of two methods can be used:
/// [crate::connect] and [crate::ConnectOptions::connect]
#[derive(Clone, Debug)]
pub struct Client {
info: tokio::sync::watch::Receiver<ServerInfo>,
pub(crate) state: tokio::sync::watch::Receiver<State>,
pub(crate) sender: mpsc::Sender<Command>,
next_subscription_id: Arc<AtomicU64>,
subscription_capacity: usize,
inbox_prefix: Arc<str>,
request_timeout: Option<Duration>,
max_payload: Arc<AtomicUsize>,
}

impl Client {
pub(crate) fn new(
info: tokio::sync::watch::Receiver<ServerInfo>,
Expand All @@ -135,10 +117,12 @@ impl Client {
request_timeout: Option<Duration>,
max_payload: Arc<AtomicUsize>,
) -> Client {
let poll_sender = PollSender::new(sender.clone());
Client {
info,
state,
sender,
poll_sender,
next_subscription_id: Arc::new(AtomicU64::new(1)),
subscription_capacity: capacity,
inbox_prefix: inbox_prefix.into(),
Expand Down Expand Up @@ -242,26 +226,16 @@ impl Client {
}

self.sender
.send(Command::Publish {
.send(Command::Publish(PublishMessage {
subject,
payload,
respond: None,
headers: None,
})
}))
.await?;
Ok(())
}

/// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject.
pub fn publish_sink<S: ToSubject>(&self, subject: S) -> Publisher {
Publisher {
subject: subject.to_subject(),
respond: None,
headers: None,
sender: PollSender::new(self.sender.clone()),
}
}

/// Publish a [Message] with headers to a given subject.
///
/// # Examples
Expand Down Expand Up @@ -290,30 +264,16 @@ impl Client {
let subject = subject.to_subject();

self.sender
.send(Command::Publish {
.send(Command::Publish(PublishMessage {
subject,
payload,
respond: None,
headers: Some(headers),
})
}))
.await?;
Ok(())
}

/// Returns a [Publisher], which can be used to send multiple [Messages](Message) with headers to a given subject.
pub fn publish_with_headers_sink<S: ToSubject>(
&self,
subject: S,
headers: HeaderMap,
) -> Publisher {
Publisher {
subject: subject.to_subject(),
respond: None,
headers: Some(headers),
sender: PollSender::new(self.sender.clone()),
}
}

/// Publish a [Message] to a given subject, with specified response subject
/// to which the subscriber can respond.
/// This method does not await for the response.
Expand All @@ -340,32 +300,16 @@ impl Client {
let reply = reply.to_subject();

self.sender
.send(Command::Publish {
.send(Command::Publish(PublishMessage {
subject,
payload,
respond: Some(reply),
headers: None,
})
}))
.await?;
Ok(())
}

/// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject,
/// with specified response subject to which the subscriber can respond.
/// [Publisher] does not await for the response.
pub fn publish_with_reply_sink<S: ToSubject, R: ToSubject>(
&self,
subject: S,
reply: R,
) -> Publisher {
Publisher {
subject: subject.to_subject(),
respond: Some(reply.to_subject()),
headers: None,
sender: PollSender::new(self.sender.clone()),
}
}

/// Publish a [Message] to a given subject with headers and specified response subject
/// to which the subscriber can respond.
/// This method does not await for the response.
Expand Down Expand Up @@ -395,33 +339,16 @@ impl Client {
let reply = reply.to_subject();

self.sender
.send(Command::Publish {
.send(Command::Publish(PublishMessage {
subject,
payload,
respond: Some(reply),
headers: Some(headers),
})
}))
.await?;
Ok(())
}

/// Returns a [Publisher], which can be used to send multiple [Messages](Message) to a given subject,
/// with headers and specified response subject to which the subscriber can respond.
/// [Publisher] does not await for the response.
pub fn publish_with_reply_and_headers_sink<S: ToSubject, R: ToSubject>(
&self,
subject: S,
reply: R,
headers: HeaderMap,
) -> Publisher {
Publisher {
subject: subject.to_subject(),
respond: Some(reply.to_subject()),
headers: Some(headers),
sender: PollSender::new(self.sender.clone()),
}
}

/// Sends the request with headers.
///
/// # Examples
Expand Down
20 changes: 12 additions & 8 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,19 @@ pub(crate) enum ServerOp {
},
}

/// `PublishMessage` represents a message being published
#[derive(Debug)]
pub struct PublishMessage {
pub subject: Subject,
pub payload: Bytes,
pub respond: Option<Subject>,
pub headers: Option<HeaderMap>,
}

/// `Command` represents all commands that a [`Client`] can handle
#[derive(Debug)]
pub(crate) enum Command {
Publish {
subject: Subject,
payload: Bytes,
respond: Option<Subject>,
headers: Option<HeaderMap>,
},
Publish(PublishMessage),
Request {
subject: Subject,
payload: Bytes,
Expand Down Expand Up @@ -823,12 +827,12 @@ impl ConnectionHandler {
self.connection.enqueue_write_op(&pub_op);
}

Command::Publish {
Command::Publish(PublishMessage {
subject,
payload,
respond,
headers,
} => {
}) => {
self.connection.enqueue_write_op(&ClientOp::Publish {
subject,
payload,
Expand Down

0 comments on commit f28ce23

Please sign in to comment.