Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiplex requests over a single subscription #1069

Merged
merged 14 commits into from
Aug 14, 2023
111 changes: 76 additions & 35 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber}
use crate::error::Error;
use bytes::Bytes;
use futures::future::TryFutureExt;
use futures::stream::StreamExt;
use futures::StreamExt;
use once_cell::sync::Lazy;
use regex::Regex;
use std::fmt::Display;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tracing::trace;

static VERSION_RE: Lazy<Regex> =
Expand Down Expand Up @@ -71,7 +71,7 @@ impl Client {
info,
state,
sender,
next_subscription_id: Arc::new(AtomicU64::new(0)),
next_subscription_id: Arc::new(AtomicU64::new(1)),
subscription_capacity: capacity,
inbox_prefix,
request_timeout,
Expand Down Expand Up @@ -335,42 +335,83 @@ impl Client {
subject: String,
request: Request,
) -> Result<Message, RequestError> {
let inbox = request.inbox.unwrap_or_else(|| self.new_inbox());
let timeout = request.timeout.unwrap_or(self.request_timeout);
let mut sub = self.subscribe(inbox.clone()).await?;
let payload: Bytes = request.payload.unwrap_or_else(Bytes::new);
match request.headers {
Some(headers) => {
self.publish_with_reply_and_headers(subject, inbox, headers, payload)
.await?
if let Some(inbox) = request.inbox {
let timeout = request.timeout.unwrap_or(self.request_timeout);
let mut sub = self.subscribe(inbox.clone()).await?;
let payload: Bytes = request.payload.unwrap_or_else(Bytes::new);
match request.headers {
Some(headers) => {
self.publish_with_reply_and_headers(subject, inbox, headers, payload)
.await?
}
None => self.publish_with_reply(subject, inbox, payload).await?,
}
None => self.publish_with_reply(subject, inbox, payload).await?,
}
self.flush()
.await
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, sub.next())
.map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
.await?
self.flush()
.await
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, sub.next())
.map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
.await?
}
None => sub.next().await,
};
match request {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(RequestError::with_source(
RequestErrorKind::NoResponders,
"no responders",
));
}
Ok(message)
}
None => Err(RequestError::with_source(
RequestErrorKind::Other,
"broken pipe",
)),
}
None => sub.next().await,
};
match request {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(RequestError::with_source(
RequestErrorKind::NoResponders,
"no responders",
));
} else {
let (sender, receiver) = oneshot::channel();

let payload = request.payload.unwrap_or_else(Bytes::new);
let respond = self.new_inbox();
let headers = request.headers;

self.sender
.send(Command::Request {
subject,
payload,
respond,
headers,
sender,
})
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
.await?;

let timeout = request.timeout.unwrap_or(self.request_timeout);
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, receiver)
.map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
.await?
}
None => receiver.await,
};

match request {
Ok(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(RequestError::with_source(
RequestErrorKind::NoResponders,
"no responders",
));
}
Ok(message)
}
Ok(message)
Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
}
None => Err(RequestError::with_source(
RequestErrorKind::Other,
"broken pipe",
)),
}
}

Expand Down
99 changes: 99 additions & 0 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
const VERSION: &str = env!("CARGO_PKG_VERSION");
const LANG: &str = "rust";
const MAX_PENDING_PINGS: usize = 2;
const MULTIPLEXER_SID: u64 = 0;

/// A re-export of the `rustls` crate used in this crate,
/// for use in cases where manual client configurations
Expand Down Expand Up @@ -267,6 +268,13 @@ pub(crate) enum Command {
respond: Option<String>,
headers: Option<HeaderMap>,
},
Request {
subject: String,
payload: Bytes,
respond: String,
headers: Option<HeaderMap>,
sender: oneshot::Sender<Message>,
},
Subscribe {
sid: u64,
subject: String,
Expand Down Expand Up @@ -315,11 +323,19 @@ struct Subscription {
max: Option<u64>,
}

#[derive(Debug)]
struct Multiplexer {
subject: String,
prefix: String,
senders: HashMap<String, oneshot::Sender<Message>>,
}

/// A connection handler which facilitates communication from channels to a single shared connection.
pub(crate) struct ConnectionHandler {
connection: Connection,
connector: Connector,
subscriptions: HashMap<u64, Subscription>,
multiplexer: Option<Multiplexer>,
pending_pings: usize,
info_sender: tokio::sync::watch::Sender<ServerInfo>,
ping_interval: Interval,
Expand All @@ -344,6 +360,7 @@ impl ConnectionHandler {
connection,
connector,
subscriptions: HashMap::new(),
multiplexer: None,
pending_pings: 0,
info_sender,
ping_interval,
Expand Down Expand Up @@ -484,6 +501,25 @@ impl ConnectionHandler {
self.handle_flush().await?;
}
}
} else if let Some(multiplexer) = self.multiplexer.as_mut() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would check if sid is as we expect, just to avoid any edge cases.

let maybe_token = subject.strip_prefix(&multiplexer.prefix).to_owned();

if let Some(token) = maybe_token {
if let Some(sender) = multiplexer.senders.remove(token) {
let message = Message {
subject,
reply,
payload,
headers,
status,
description,
length,
};

// TODO don't unwrap
sender.send(message).unwrap();
caspervonb marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
// TODO: we should probably update advertised server list here too.
Expand Down Expand Up @@ -591,6 +627,57 @@ impl ConnectionHandler {
error!("Sending Subscribe failed with {:?}", err);
}
}
Command::Request {
subject,
payload,
respond,
headers,
sender,
} => {
// FIXME unwrap or err
let (prefix, token) = respond.rsplit_once('.').unwrap();

let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() {
multiplexer
} else {
let subject = format!("{}.*", prefix);

if let Err(err) = self
.connection
.write_op(&ClientOp::Subscribe {
sid: MULTIPLEXER_SID,
subject: subject.clone(),
queue_group: None,
})
.await
{
error!("Sending Subscribe failed with {:?}", err);
}

self.multiplexer.insert(Multiplexer {
subject,
prefix: format!("{}.", prefix),
senders: HashMap::new(),
})
};

multiplexer.senders.insert(token.to_owned(), sender);

let pub_op = ClientOp::Publish {
subject,
payload,
respond: Some(respond),
headers,
};

while let Err(err) = self.connection.write_op(&pub_op).await {
self.handle_disconnect().await?;
error!("Sending Publish failed with {:?}", err);
}

self.connection.flush().await?;
}

Command::Publish {
subject,
payload,
Expand Down Expand Up @@ -645,6 +732,18 @@ impl ConnectionHandler {
.await
.unwrap();
}

if let Some(multiplexer) = &self.multiplexer {
self.connection
.write_op(&ClientOp::Subscribe {
sid: MULTIPLEXER_SID,
subject: multiplexer.subject.to_owned(),
queue_group: None,
})
.await
.unwrap();
}

self.connector.events_tx.try_send(Event::Connected).ok();

Ok(())
Expand Down