Skip to content

Commit

Permalink
Add requests multiplexing
Browse files Browse the repository at this point in the history
  • Loading branch information
caspervonb authored Aug 14, 2023
1 parent 69ee1bf commit 474cb14
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 35 deletions.
111 changes: 76 additions & 35 deletions async-nats/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ use super::{header::HeaderMap, status::StatusCode, Command, Message, Subscriber}
use crate::error::Error;
use bytes::Bytes;
use futures::future::TryFutureExt;
use futures::stream::StreamExt;
use futures::StreamExt;
use once_cell::sync::Lazy;
use regex::Regex;
use std::fmt::Display;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tracing::trace;

static VERSION_RE: Lazy<Regex> =
Expand Down Expand Up @@ -71,7 +71,7 @@ impl Client {
info,
state,
sender,
next_subscription_id: Arc::new(AtomicU64::new(0)),
next_subscription_id: Arc::new(AtomicU64::new(1)),
subscription_capacity: capacity,
inbox_prefix,
request_timeout,
Expand Down Expand Up @@ -335,42 +335,83 @@ impl Client {
subject: String,
request: Request,
) -> Result<Message, RequestError> {
let inbox = request.inbox.unwrap_or_else(|| self.new_inbox());
let timeout = request.timeout.unwrap_or(self.request_timeout);
let mut sub = self.subscribe(inbox.clone()).await?;
let payload: Bytes = request.payload.unwrap_or_else(Bytes::new);
match request.headers {
Some(headers) => {
self.publish_with_reply_and_headers(subject, inbox, headers, payload)
.await?
if let Some(inbox) = request.inbox {
let timeout = request.timeout.unwrap_or(self.request_timeout);
let mut sub = self.subscribe(inbox.clone()).await?;
let payload: Bytes = request.payload.unwrap_or_else(Bytes::new);
match request.headers {
Some(headers) => {
self.publish_with_reply_and_headers(subject, inbox, headers, payload)
.await?
}
None => self.publish_with_reply(subject, inbox, payload).await?,
}
None => self.publish_with_reply(subject, inbox, payload).await?,
}
self.flush()
.await
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, sub.next())
.map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
.await?
self.flush()
.await
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))?;
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, sub.next())
.map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
.await?
}
None => sub.next().await,
};
match request {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(RequestError::with_source(
RequestErrorKind::NoResponders,
"no responders",
));
}
Ok(message)
}
None => Err(RequestError::with_source(
RequestErrorKind::Other,
"broken pipe",
)),
}
None => sub.next().await,
};
match request {
Some(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(RequestError::with_source(
RequestErrorKind::NoResponders,
"no responders",
));
} else {
let (sender, receiver) = oneshot::channel();

let payload = request.payload.unwrap_or_else(Bytes::new);
let respond = self.new_inbox();
let headers = request.headers;

self.sender
.send(Command::Request {
subject,
payload,
respond,
headers,
sender,
})
.map_err(|err| RequestError::with_source(RequestErrorKind::Other, err))
.await?;

let timeout = request.timeout.unwrap_or(self.request_timeout);
let request = match timeout {
Some(timeout) => {
tokio::time::timeout(timeout, receiver)
.map_err(|err| RequestError::with_source(RequestErrorKind::TimedOut, err))
.await?
}
None => receiver.await,
};

match request {
Ok(message) => {
if message.status == Some(StatusCode::NO_RESPONDERS) {
return Err(RequestError::with_source(
RequestErrorKind::NoResponders,
"no responders",
));
}
Ok(message)
}
Ok(message)
Err(err) => Err(RequestError::with_source(RequestErrorKind::Other, err)),
}
None => Err(RequestError::with_source(
RequestErrorKind::Other,
"broken pipe",
)),
}
}

Expand Down
103 changes: 103 additions & 0 deletions async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
const VERSION: &str = env!("CARGO_PKG_VERSION");
const LANG: &str = "rust";
const MAX_PENDING_PINGS: usize = 2;
const MULTIPLEXER_SID: u64 = 0;

/// A re-export of the `rustls` crate used in this crate,
/// for use in cases where manual client configurations
Expand Down Expand Up @@ -267,6 +268,13 @@ pub(crate) enum Command {
respond: Option<String>,
headers: Option<HeaderMap>,
},
Request {
subject: String,
payload: Bytes,
respond: String,
headers: Option<HeaderMap>,
sender: oneshot::Sender<Message>,
},
Subscribe {
sid: u64,
subject: String,
Expand Down Expand Up @@ -315,11 +323,19 @@ struct Subscription {
max: Option<u64>,
}

#[derive(Debug)]
struct Multiplexer {
subject: String,
prefix: String,
senders: HashMap<String, oneshot::Sender<Message>>,
}

/// A connection handler which facilitates communication from channels to a single shared connection.
pub(crate) struct ConnectionHandler {
connection: Connection,
connector: Connector,
subscriptions: HashMap<u64, Subscription>,
multiplexer: Option<Multiplexer>,
pending_pings: usize,
info_sender: tokio::sync::watch::Sender<ServerInfo>,
ping_interval: Interval,
Expand All @@ -344,6 +360,7 @@ impl ConnectionHandler {
connection,
connector,
subscriptions: HashMap::new(),
multiplexer: None,
pending_pings: 0,
info_sender,
ping_interval,
Expand Down Expand Up @@ -484,6 +501,28 @@ impl ConnectionHandler {
self.handle_flush().await?;
}
}
} else if sid == MULTIPLEXER_SID {
if let Some(multiplexer) = self.multiplexer.as_mut() {
let maybe_token = subject.strip_prefix(&multiplexer.prefix).to_owned();

if let Some(token) = maybe_token {
if let Some(sender) = multiplexer.senders.remove(token) {
let message = Message {
subject,
reply,
payload,
headers,
status,
description,
length,
};

sender.send(message).map_err(|_| {
io::Error::new(io::ErrorKind::Other, "request receiver closed")
})?;
}
}
}
}
}
// TODO: we should probably update advertised server list here too.
Expand Down Expand Up @@ -591,6 +630,58 @@ impl ConnectionHandler {
error!("Sending Subscribe failed with {:?}", err);
}
}
Command::Request {
subject,
payload,
respond,
headers,
sender,
} => {
let (prefix, token) = respond.rsplit_once('.').ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, "malformed request subject")
})?;

let multiplexer = if let Some(multiplexer) = self.multiplexer.as_mut() {
multiplexer
} else {
let subject = format!("{}.*", prefix);

if let Err(err) = self
.connection
.write_op(&ClientOp::Subscribe {
sid: MULTIPLEXER_SID,
subject: subject.clone(),
queue_group: None,
})
.await
{
error!("Sending Subscribe failed with {:?}", err);
}

self.multiplexer.insert(Multiplexer {
subject,
prefix: format!("{}.", prefix),
senders: HashMap::new(),
})
};

multiplexer.senders.insert(token.to_owned(), sender);

let pub_op = ClientOp::Publish {
subject,
payload,
respond: Some(respond),
headers,
};

while let Err(err) = self.connection.write_op(&pub_op).await {
self.handle_disconnect().await?;
error!("Sending Publish failed with {:?}", err);
}

self.connection.flush().await?;
}

Command::Publish {
subject,
payload,
Expand Down Expand Up @@ -645,6 +736,18 @@ impl ConnectionHandler {
.await
.unwrap();
}

if let Some(multiplexer) = &self.multiplexer {
self.connection
.write_op(&ClientOp::Subscribe {
sid: MULTIPLEXER_SID,
subject: multiplexer.subject.to_owned(),
queue_group: None,
})
.await
.unwrap();
}

self.connector.events_tx.try_send(Event::Connected).ok();

Ok(())
Expand Down

0 comments on commit 474cb14

Please sign in to comment.