From 7436edaebbaeb0a66f34a255ad80750e2e885a8c Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Wed, 2 Aug 2023 11:59:29 +0800 Subject: [PATCH 01/14] wip --- async-nats/src/client.rs | 35 +++++++++-------- async-nats/src/lib.rs | 85 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 16 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 882f54e40..f5b1fd215 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -26,7 +26,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tracing::trace; static VERSION_RE: Lazy = @@ -335,28 +335,31 @@ impl Client { subject: String, request: Request, ) -> Result { - let inbox = request.inbox.unwrap_or_else(|| self.new_inbox()); + let (sender, receiver) = oneshot::channel(); + + let payload = request.payload.unwrap_or_else(|| Bytes::new()); + let headers = request.headers; + + self.sender + .send(Command::Request { + subject, + payload, + headers, + sender, + }) + .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err)) + .await?; + let timeout = request.timeout.unwrap_or(self.request_timeout); - let mut sub = self.subscribe(inbox.clone()).await?; - let payload: Bytes = request.payload.unwrap_or_else(Bytes::new); - match request.headers { - Some(headers) => { - self.publish_with_reply_and_headers(subject, inbox, headers, payload) - .await? - } - None => self.publish_with_reply(subject, inbox, payload).await?, - } - self.flush() - .await - .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?; let request = match timeout { Some(timeout) => { - tokio::time::timeout(timeout, sub.next()) + tokio::time::timeout(timeout, receiver) .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err)) .await? } - None => sub.next().await, + None => receiver.await, }; + match request { Some(message) => { if message.status == Some(StatusCode::NO_RESPONDERS) { diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 200f88f7e..1452e8c58 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -267,6 +267,12 @@ pub(crate) enum Command { respond: Option, headers: Option, }, + Request { + subject: String, + payload: Bytes, + headers: Option, + sender: oneshot::Sender, + }, Subscribe { sid: u64, subject: String, @@ -315,11 +321,19 @@ struct Subscription { max: Option, } +#[derive(Debug)] +struct Multiplexer { + sid: u64, + prefix: String, + senders: HashMap>, +} + /// A connection handler which facilitates communication from channels to a single shared connection. pub(crate) struct ConnectionHandler { connection: Connection, connector: Connector, subscriptions: HashMap, + multiplexer: Option, pending_pings: usize, info_sender: tokio::sync::watch::Sender, ping_interval: Interval, @@ -344,6 +358,7 @@ impl ConnectionHandler { connection, connector, subscriptions: HashMap::new(), + multiplexer: None, pending_pings: 0, info_sender, ping_interval, @@ -484,6 +499,24 @@ impl ConnectionHandler { self.handle_flush().await?; } } + } else if let Some(multiplexer) = self.multiplexer.as_mut() { + let maybe_token = subject.strip_prefix(&multiplexer.prefix).to_owned(); + + if let Some(token) = maybe_token { + if let Some(sender) = multiplexer.senders.remove(token) { + let message = Message { + subject, + reply, + payload, + headers, + status, + description, + length, + }; + + sender.send(message); + } + } } } // TODO: we should probably update advertised server list here too. @@ -591,6 +624,58 @@ impl ConnectionHandler { error!("Sending Subscribe failed with {:?}", err); } } + Command::Request { + subject, + payload, + headers, + sender, + } => { + let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() { + multiplexer + } else { + // TODO what is the sid? + let sid = 0; + + let inbox = nuid::next(); + let prefix = format!("{}.", inbox); + let subject = format!("{}.*", inbox); + + if let Err(err) = self + .connection + .write_op(&ClientOp::Subscribe { + sid, + subject, + queue_group: None, + }) + .await + { + error!("Sending Subscribe failed with {:?}", err); + } + + self.multiplexer.insert(Multiplexer { + sid, + prefix, + senders: HashMap::new(), + }) + }; + + let token = nuid::next(); + let respond = Some(format!("{}{}", multiplexer.prefix, token)); + multiplexer.senders.insert(token, sender); + + let pub_op = ClientOp::Publish { + subject, + payload, + respond, + headers, + }; + + while let Err(err) = self.connection.write_op(&pub_op).await { + self.handle_disconnect().await?; + error!("Sending Publish failed with {:?}", err); + } + } + Command::Publish { subject, payload, From 49e5b5a7ffde3c59bb16ed913ebd60243ddc4711 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Wed, 2 Aug 2023 12:03:50 +0800 Subject: [PATCH 02/14] fixup --- async-nats/src/client.rs | 7 ++----- async-nats/src/lib.rs | 5 ++++- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index f5b1fd215..5c8625164 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -361,7 +361,7 @@ impl Client { }; match request { - Some(message) => { + Ok(message) => { if message.status == Some(StatusCode::NO_RESPONDERS) { return Err(RequestError::with_source( RequestErrorKind::NoResponders, @@ -370,10 +370,7 @@ impl Client { } Ok(message) } - None => Err(RequestError::with_source( - RequestErrorKind::Other, - "broken pipe", - )), + Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)), } } diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 1452e8c58..431574473 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -514,7 +514,8 @@ impl ConnectionHandler { length, }; - sender.send(message); + // TODO don't unwrap + sender.send(message).unwrap(); } } } @@ -674,6 +675,8 @@ impl ConnectionHandler { self.handle_disconnect().await?; error!("Sending Publish failed with {:?}", err); } + + // TODO(caspervonb) we can flush directly here } Command::Publish { From 28f256a64eae9882bc7d46780c7fd4a16e5b8ed5 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Wed, 2 Aug 2023 13:39:31 +0800 Subject: [PATCH 03/14] Leave sid 0 for requests --- async-nats/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 5c8625164..862ee5019 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -71,7 +71,7 @@ impl Client { info, state, sender, - next_subscription_id: Arc::new(AtomicU64::new(0)), + next_subscription_id: Arc::new(AtomicU64::new(1)), subscription_capacity: capacity, inbox_prefix, request_timeout, From 76567f8e5397fd706e89b5ccc449a0efa1d64576 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Wed, 2 Aug 2023 23:18:17 +0800 Subject: [PATCH 04/14] Use inbox as defined by client --- async-nats/src/client.rs | 2 ++ async-nats/src/lib.rs | 18 +++++++++--------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 862ee5019..ef9f99815 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -338,12 +338,14 @@ impl Client { let (sender, receiver) = oneshot::channel(); let payload = request.payload.unwrap_or_else(|| Bytes::new()); + let respond = self.new_inbox(); let headers = request.headers; self.sender .send(Command::Request { subject, payload, + respond, headers, sender, }) diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 431574473..2ae77a030 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -270,6 +270,7 @@ pub(crate) enum Command { Request { subject: String, payload: Bytes, + respond: String, headers: Option, sender: oneshot::Sender, }, @@ -628,18 +629,19 @@ impl ConnectionHandler { Command::Request { subject, payload, + respond, headers, sender, } => { + // FIXME unwrap or err + let (prefix, token) = respond.rsplit_once('.').unwrap(); + let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() { multiplexer } else { // TODO what is the sid? let sid = 0; - - let inbox = nuid::next(); - let prefix = format!("{}.", inbox); - let subject = format!("{}.*", inbox); + let subject = format!("{}.*", prefix); if let Err(err) = self .connection @@ -655,19 +657,17 @@ impl ConnectionHandler { self.multiplexer.insert(Multiplexer { sid, - prefix, + prefix: prefix.to_owned(), senders: HashMap::new(), }) }; - let token = nuid::next(); - let respond = Some(format!("{}{}", multiplexer.prefix, token)); - multiplexer.senders.insert(token, sender); + multiplexer.senders.insert(token.to_owned(), sender); let pub_op = ClientOp::Publish { subject, payload, - respond, + respond: Some(respond), headers, }; From bac00e0cbaa271239c89129cd95f23882d93194a Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Thu, 3 Aug 2023 09:39:43 +0800 Subject: [PATCH 05/14] Fix prefix --- async-nats/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 2ae77a030..639ca7583 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -657,7 +657,7 @@ impl ConnectionHandler { self.multiplexer.insert(Multiplexer { sid, - prefix: prefix.to_owned(), + prefix: format!("{}.", prefix), senders: HashMap::new(), }) }; @@ -676,7 +676,7 @@ impl ConnectionHandler { error!("Sending Publish failed with {:?}", err); } - // TODO(caspervonb) we can flush directly here + self.connection.flush().await?; } Command::Publish { From 16d19a338080a27793b0273778c4f7a86ba4af15 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Sat, 5 Aug 2023 10:41:46 +0800 Subject: [PATCH 06/14] Fix lint --- async-nats/src/client.rs | 1 - async-nats/src/lib.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index ef9f99815..1cfd5f5eb 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -18,7 +18,6 @@ use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber} use crate::error::Error; use bytes::Bytes; use futures::future::TryFutureExt; -use futures::stream::StreamExt; use once_cell::sync::Lazy; use regex::Regex; use std::fmt::Display; diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 639ca7583..6ab0c587e 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -324,7 +324,6 @@ struct Subscription { #[derive(Debug)] struct Multiplexer { - sid: u64, prefix: String, senders: HashMap>, } From abe3e66f5b85411783785d3fa71d2a6cb6f41e5f Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Sat, 5 Aug 2023 10:55:02 +0800 Subject: [PATCH 07/14] Remove sid --- async-nats/src/lib.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 6ab0c587e..81f3f75c3 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -638,14 +638,12 @@ impl ConnectionHandler { let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() { multiplexer } else { - // TODO what is the sid? - let sid = 0; let subject = format!("{}.*", prefix); if let Err(err) = self .connection .write_op(&ClientOp::Subscribe { - sid, + sid: 0, subject, queue_group: None, }) @@ -655,7 +653,6 @@ impl ConnectionHandler { } self.multiplexer.insert(Multiplexer { - sid, prefix: format!("{}.", prefix), senders: HashMap::new(), }) From 623c0638340f5e4d15b5d870967277aff0ace0c4 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Sat, 5 Aug 2023 12:17:43 +0800 Subject: [PATCH 08/14] Keep non-muxed implementation for custom inboxes --- async-nats/src/client.rs | 110 ++++++++++++++++++++++++++------------- 1 file changed, 75 insertions(+), 35 deletions(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index 1cfd5f5eb..e8d2ed527 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -18,6 +18,7 @@ use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber} use crate::error::Error; use bytes::Bytes; use futures::future::TryFutureExt; +use futures::StreamExt; use once_cell::sync::Lazy; use regex::Regex; use std::fmt::Display; @@ -334,44 +335,83 @@ impl Client { subject: String, request: Request, ) -> Result { - let (sender, receiver) = oneshot::channel(); - - let payload = request.payload.unwrap_or_else(|| Bytes::new()); - let respond = self.new_inbox(); - let headers = request.headers; - - self.sender - .send(Command::Request { - subject, - payload, - respond, - headers, - sender, - }) - .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err)) - .await?; - - let timeout = request.timeout.unwrap_or(self.request_timeout); - let request = match timeout { - Some(timeout) => { - tokio::time::timeout(timeout, receiver) - .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err)) - .await? + if let Some(inbox) = request.inbox { + let timeout = request.timeout.unwrap_or(self.request_timeout); + let mut sub = self.subscribe(inbox.clone()).await?; + let payload: Bytes = request.payload.unwrap_or_else(Bytes::new); + match request.headers { + Some(headers) => { + self.publish_with_reply_and_headers(subject, inbox, headers, payload) + .await? + } + None => self.publish_with_reply(subject, inbox, payload).await?, } - None => receiver.await, - }; - - match request { - Ok(message) => { - if message.status == Some(StatusCode::NO_RESPONDERS) { - return Err(RequestError::with_source( - RequestErrorKind::NoResponders, - "no responders", - )); + self.flush() + .await + .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?; + let request = match timeout { + Some(timeout) => { + tokio::time::timeout(timeout, sub.next()) + .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err)) + .await? + } + None => sub.next().await, + }; + match request { + Some(message) => { + if message.status == Some(StatusCode::NO_RESPONDERS) { + return Err(RequestError::with_source( + RequestErrorKind::NoResponders, + "no responders", + )); + } + Ok(message) + } + None => Err(RequestError::with_source( + RequestErrorKind::Other, + "broken pipe", + )), + } + } else { + let (sender, receiver) = oneshot::channel(); + + let payload = request.payload.unwrap_or_else(|| Bytes::new()); + let respond = self.new_inbox(); + let headers = request.headers; + + self.sender + .send(Command::Request { + subject, + payload, + respond, + headers, + sender, + }) + .map_err(|err| RequestError::with_source(RequestErrorKind::Other, err)) + .await?; + + let timeout = request.timeout.unwrap_or(self.request_timeout); + let request = match timeout { + Some(timeout) => { + tokio::time::timeout(timeout, receiver) + .map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err)) + .await? + } + None => receiver.await, + }; + + match request { + Ok(message) => { + if message.status == Some(StatusCode::NO_RESPONDERS) { + return Err(RequestError::with_source( + RequestErrorKind::NoResponders, + "no responders", + )); + } + Ok(message) } - Ok(message) + Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)), } - Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)), } } From d52f049fd0faacf9b9ab7a212c25f3f4b4e8ec80 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Sat, 5 Aug 2023 12:20:41 +0800 Subject: [PATCH 09/14] Re-create multiplexer subscription on disconnect --- async-nats/src/lib.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 81f3f75c3..87acfef88 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -324,6 +324,7 @@ struct Subscription { #[derive(Debug)] struct Multiplexer { + subject: String, prefix: String, senders: HashMap>, } @@ -653,6 +654,7 @@ impl ConnectionHandler { } self.multiplexer.insert(Multiplexer { + subject, prefix: format!("{}.", prefix), senders: HashMap::new(), }) @@ -729,6 +731,18 @@ impl ConnectionHandler { .await .unwrap(); } + + if let Some(multiplexer) = &self.multiplexer { + self.connection + .write_op(&ClientOp::Subscribe { + sid: 0, + subject: multiplexer.subject.to_owned(), + queue_group: None, + }) + .await + .unwrap(); + } + self.connector.events_tx.try_send(Event::Connected).ok(); Ok(()) From a372a06a909fe4ecc8a6e60a122fd2db27e2e1f1 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Sat, 5 Aug 2023 12:21:49 +0800 Subject: [PATCH 10/14] Fix lint --- async-nats/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/src/client.rs b/async-nats/src/client.rs index e8d2ed527..9791c246f 100644 --- a/async-nats/src/client.rs +++ b/async-nats/src/client.rs @@ -375,7 +375,7 @@ impl Client { } else { let (sender, receiver) = oneshot::channel(); - let payload = request.payload.unwrap_or_else(|| Bytes::new()); + let payload = request.payload.unwrap_or_else(Bytes::new); let respond = self.new_inbox(); let headers = request.headers; From 62d768e1e2a828da3b22ce104c2cd8417b0e0ca7 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Sat, 5 Aug 2023 12:27:21 +0800 Subject: [PATCH 11/14] Fix borrow --- async-nats/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 87acfef88..46381601f 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -645,7 +645,7 @@ impl ConnectionHandler { .connection .write_op(&ClientOp::Subscribe { sid: 0, - subject, + subject: subject.clone(), queue_group: None, }) .await From 2be8253372d12972721d739e1f833224c5a12fd4 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Sat, 5 Aug 2023 12:59:57 +0800 Subject: [PATCH 12/14] Use a constant for multiplexer sid --- async-nats/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 46381601f..9556bd6b9 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -152,6 +152,7 @@ pub type Error = Box; const VERSION: &str = env!("CARGO_PKG_VERSION"); const LANG: &str = "rust"; const MAX_PENDING_PINGS: usize = 2; +const MULTIPLEXER_SID: u64 = 0; /// A re-export of the `rustls` crate used in this crate, /// for use in cases where manual client configurations @@ -644,7 +645,7 @@ impl ConnectionHandler { if let Err(err) = self .connection .write_op(&ClientOp::Subscribe { - sid: 0, + sid: MULTIPLEXER_SID, subject: subject.clone(), queue_group: None, }) @@ -735,7 +736,7 @@ impl ConnectionHandler { if let Some(multiplexer) = &self.multiplexer { self.connection .write_op(&ClientOp::Subscribe { - sid: 0, + sid: MULTIPLEXER_SID, subject: multiplexer.subject.to_owned(), queue_group: None, }) From 618cf14f3c1fb98551f8c4fd2ea81082edbcf017 Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Wed, 9 Aug 2023 19:03:17 +0800 Subject: [PATCH 13/14] Fixup TODOs --- async-nats/src/lib.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 9556bd6b9..727e32547 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -516,8 +516,9 @@ impl ConnectionHandler { length, }; - // TODO don't unwrap - sender.send(message).unwrap(); + sender.send(message).map_err(|_| { + io::Error::new(io::ErrorKind::Other, "request receiver closed") + })?; } } } @@ -634,8 +635,9 @@ impl ConnectionHandler { headers, sender, } => { - // FIXME unwrap or err - let (prefix, token) = respond.rsplit_once('.').unwrap(); + let (prefix, token) = respond.rsplit_once('.').ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, "malformed request subject") + })?; let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() { multiplexer From a0b7a58410e9c68713ac1793d037b54a7f09d31a Mon Sep 17 00:00:00 2001 From: Casper Beyer Date: Mon, 14 Aug 2023 21:33:09 +0800 Subject: [PATCH 14/14] Guard against sid != MULTIPLEXER_SID --- async-nats/src/lib.rs | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index 727e32547..4e3d14e36 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -501,24 +501,26 @@ impl ConnectionHandler { self.handle_flush().await?; } } - } else if let Some(multiplexer) = self.multiplexer.as_mut() { - let maybe_token = subject.strip_prefix(&multiplexer.prefix).to_owned(); - - if let Some(token) = maybe_token { - if let Some(sender) = multiplexer.senders.remove(token) { - let message = Message { - subject, - reply, - payload, - headers, - status, - description, - length, - }; - - sender.send(message).map_err(|_| { - io::Error::new(io::ErrorKind::Other, "request receiver closed") - })?; + } else if sid == MULTIPLEXER_SID { + if let Some(multiplexer) = self.multiplexer.as_mut() { + let maybe_token = subject.strip_prefix(&multiplexer.prefix).to_owned(); + + if let Some(token) = maybe_token { + if let Some(sender) = multiplexer.senders.remove(token) { + let message = Message { + subject, + reply, + payload, + headers, + status, + description, + length, + }; + + sender.send(message).map_err(|_| { + io::Error::new(io::ErrorKind::Other, "request receiver closed") + })?; + } } } }