-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement futures::Sink<PublishMessage>
on async_nats::Client
#1267
Conversation
async_nats::Client::sender
for advanced use casesasync_nats::client::Publisher
byte sink for sending multiple messages
Test failure looks unrelated to the PR |
We agreed to implement this, however not directly in the client, but in our extension crate, which will become available very soon. Please stay tuned, we will share more info soon. |
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
This can be thought of as another end of the `Subscriber` or a partially-applied `publish{_with_headers,_with_reply,_with_reply_and_headers}` Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
5354f82
to
3176eb8
Compare
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
3176eb8
to
f28ce23
Compare
Updated the PR following an offline discussion. @caspervonb @Jarema please take a look! |
async_nats::client::Publisher
byte sink for sending multiple messagesfutures::Sink<PublishMessage>
on async_nats::Client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, two minute name bike shedding with @Jarema
next_subscription_id: Arc<AtomicU64>, | ||
subscription_capacity: usize, | ||
inbox_prefix: Arc<str>, | ||
request_timeout: Option<Duration>, | ||
max_payload: Arc<AtomicUsize>, | ||
} | ||
|
||
impl Sink<PublishMessage> for Client { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought this would've been Message directly but it has some extra fields.
Maybe? cc @Jarema
impl Sink<PublishMessage> for Client { | |
impl Sink<Publish> for Client { |
We don't want different sinks for pub vs request right?
Also follow up, do we want a sink on jet stream context as-well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, those additional fields are making it less straightforward...
I think I prefer PublishMessage
, but I'm not entirely sure yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but I would rename the respond
to reply
for consistency with Message
(I know our internal name is not consistent, but I prefer consistency in public api if I have to choose one :))
async-nats/src/lib.rs
Outdated
pub struct PublishMessage { | ||
pub subject: Subject, | ||
pub payload: Bytes, | ||
pub respond: Option<Subject>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rename it to reply
, to be consistent with crate::Message
next_subscription_id: Arc<AtomicU64>, | ||
subscription_capacity: usize, | ||
inbox_prefix: Arc<str>, | ||
request_timeout: Option<Duration>, | ||
max_payload: Arc<AtomicUsize>, | ||
} | ||
|
||
impl Sink<PublishMessage> for Client { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, those additional fields are making it less straightforward...
I think I prefer PublishMessage
, but I'm not entirely sure yet.
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
closes #1266
An example real-world use case for this looks like this following:
https://github.com/rvolosatovs/wrpc/blob/85b043b84bd70fa2994cde8ced129d05b6739284/crates/transport-nats-next/src/lib.rs#L351-L412
This in an
AsyncWrite
implementation, which chunks bytes according to NATS payload limits and correctly handles polling and async wake up (using thePublisher
introduced in this PR)