Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Websockets support #1330

Merged
merged 12 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ jobs:

- name: Install msrv Rust on ubuntu-latest
id: install-rust
uses: dtolnay/rust-toolchain@1.70.0
uses: dtolnay/rust-toolchain@1.79.0
- name: Cache the build artifacts
uses: Swatinem/rust-cache@v2
with:
Expand Down
8 changes: 5 additions & 3 deletions async-nats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "async-nats"
authors = ["Tomasz Pietrek <tomasz@nats.io>", "Casper Beyer <caspervonb@pm.me>"]
version = "0.37.0"
edition = "2021"
rust = "1.74.0"
rust = "1.79.0"
description = "A async Rust NATS client"
license = "Apache-2.0"
documentation = "https://docs.rs/async-nats"
Expand Down Expand Up @@ -41,6 +41,8 @@ ring = { version = "0.17", optional = true }
rand = "0.8"
webpki = { package = "rustls-webpki", version = "0.102" }
portable-atomic = "1"
tokio-websockets = { version = "0.10", features = ["client", "rand", "rustls-native-roots"] }
pin-project = "1.0"

[dev-dependencies]
ring = "0.17"
Expand All @@ -62,8 +64,8 @@ num = "0.4.1"
default = ["server_2_10", "ring"]
# Enables Service API for the client.
service = []
aws-lc-rs = ["dep:aws-lc-rs", "tokio-rustls/aws-lc-rs"]
ring = ["dep:ring", "tokio-rustls/ring"]
aws-lc-rs = ["dep:aws-lc-rs", "tokio-rustls/aws-lc-rs", "tokio-websockets/aws-lc-rs"]
ring = ["dep:ring", "tokio-rustls/ring", "tokio-websockets/ring"]
fips = ["aws-lc-rs", "tokio-rustls/fips"]
# All experimental features are part of this feature flag.
experimental = ["service"]
Expand Down
103 changes: 102 additions & 1 deletion async-nats/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use std::sync::Arc;
use std::task::{Context, Poll};

use bytes::{Buf, Bytes, BytesMut};
use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite};
use futures::{SinkExt, StreamExt};
use pin_project::pin_project;
use tokio::io::{self, AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use tokio_websockets::WebSocketStream;

use crate::header::{HeaderMap, HeaderName, IntoHeaderValue};
use crate::status::StatusCode;
Expand Down Expand Up @@ -683,6 +686,104 @@ impl Connection {
}
}

#[pin_project]
pub(crate) struct WebSocketAdapter<T> {
#[pin]
pub(crate) inner: WebSocketStream<T>,
pub(crate) read_buf: BytesMut,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it needs its own buffer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WS libraries returns whole WebSockets messages. Those can be way bigger than the ReadBuf. Is there a way to avoid that buffer easily? :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, had to read the docs, it's a Stream, not a "Stream".

}

impl<T> WebSocketAdapter<T> {
pub(crate) fn new(inner: WebSocketStream<T>) -> Self {
Self {
inner,
read_buf: BytesMut::new(),
}
}
}

impl<T> AsyncRead for WebSocketAdapter<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let mut this = self.project();

loop {
// If we have data in the read buffer, let's move it to the output buffer.
if !this.read_buf.is_empty() {
let len = std::cmp::min(buf.remaining(), this.read_buf.len());
buf.put_slice(&this.read_buf.split_to(len));
return Poll::Ready(Ok(()));
}

match this.inner.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(message))) => {
this.read_buf.extend_from_slice(message.as_payload());
}
Poll::Ready(Some(Err(e))) => {
return Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e)));
}
Poll::Ready(None) => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"WebSocket closed",
)));
}
Poll::Pending => {
return Poll::Pending;
}
}
}
}
}

impl<T> AsyncWrite for WebSocketAdapter<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let mut this = self.project();

let data = buf.to_vec();
match this.inner.poll_ready_unpin(cx) {
Poll::Ready(Ok(())) => match this
.inner
.start_send_unpin(tokio_websockets::Message::binary(data))
{
Ok(()) => Poll::Ready(Ok(buf.len())),
Err(e) => Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e))),
},
Poll::Ready(Err(e)) => {
Poll::Ready(Err(std::io::Error::new(std::io::ErrorKind::Other, e)))
}
Poll::Pending => Poll::Pending,
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.project()
.inner
.poll_flush_unpin(cx)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.project()
.inner
.poll_close_unpin(cx)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}
}

#[cfg(test)]
mod read_op {
use std::sync::Arc;
Expand Down
92 changes: 75 additions & 17 deletions async-nats/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::auth::Auth;
use crate::client::Statistics;
use crate::connection::Connection;
use crate::connection::State;
use crate::connection::WebSocketAdapter;
use crate::options::CallbackArg1;
use crate::tls;
use crate::AuthError;
Expand Down Expand Up @@ -168,7 +169,11 @@ impl Connector {
.map_err(|err| ConnectError::with_source(crate::ConnectErrorKind::Dns, err))?;
for socket_addr in socket_addrs {
match self
.try_connect_to(&socket_addr, server_addr.tls_required(), server_addr.host())
.try_connect_to(
&socket_addr,
server_addr.tls_required(),
server_addr.clone(),
)
.await
{
Ok((server_info, mut connection)) => {
Expand Down Expand Up @@ -321,22 +326,74 @@ impl Connector {
&self,
socket_addr: &SocketAddr,
tls_required: bool,
tls_host: &str,
server_addr: ServerAddr,
) -> Result<(ServerInfo, Connection), ConnectError> {
let tcp_stream = tokio::time::timeout(
self.options.connection_timeout,
TcpStream::connect(socket_addr),
)
.await
.map_err(|_| ConnectError::new(crate::ConnectErrorKind::TimedOut))??;

tcp_stream.set_nodelay(true)?;
let mut connection = match server_addr.scheme() {
"ws" => {
let ws = tokio::time::timeout(
self.options.connection_timeout,
tokio_websockets::client::Builder::new()
.uri(format!("{}://{}", server_addr.scheme(), socket_addr).as_str())
.map_err(|err| {
ConnectError::with_source(crate::ConnectErrorKind::ServerParse, err)
})?
.connect(),
)
.await
.map_err(|_| ConnectError::new(crate::ConnectErrorKind::TimedOut))?
.map_err(|err| ConnectError::with_source(crate::ConnectErrorKind::Io, err))?;

let mut connection = Connection::new(
Box::new(tcp_stream),
self.options.read_buffer_capacity.into(),
self.connect_stats.clone(),
);
let con = WebSocketAdapter::new(ws.0);
Connection::new(Box::new(con), 0, self.connect_stats.clone())
}
"wss" => {
let domain = webpki::types::ServerName::try_from(server_addr.host())
.map_err(|err| ConnectError::with_source(crate::ConnectErrorKind::Tls, err))?;
let tls_config =
Arc::new(tls::config_tls(&self.options).await.map_err(|err| {
ConnectError::with_source(crate::ConnectErrorKind::Tls, err)
})?);
let tls_connector = tokio_rustls::TlsConnector::from(tls_config);
let ws = tokio::time::timeout(
self.options.connection_timeout,
tokio_websockets::client::Builder::new()
.connector(&tokio_websockets::Connector::Rustls(tls_connector))
.uri(
format!(
"{}://{}:{}",
server_addr.scheme(),
domain.to_str(),
server_addr.port()
)
.as_str(),
)
.map_err(|err| {
ConnectError::with_source(crate::ConnectErrorKind::ServerParse, err)
})?
.connect(),
)
.await
.map_err(|_| ConnectError::new(crate::ConnectErrorKind::TimedOut))?
.map_err(|err| ConnectError::with_source(crate::ConnectErrorKind::Io, err))?;
let con = WebSocketAdapter::new(ws.0);
Connection::new(Box::new(con), 0, self.connect_stats.clone())
}
_ => {
let tcp_stream = tokio::time::timeout(
self.options.connection_timeout,
TcpStream::connect(socket_addr),
)
.await
.map_err(|_| ConnectError::new(crate::ConnectErrorKind::TimedOut))??;
tcp_stream.set_nodelay(true)?;

Connection::new(
Box::new(tcp_stream),
self.options.read_buffer_capacity.into(),
self.connect_stats.clone(),
)
}
};

let tls_connection = |connection: Connection| async {
let tls_config = Arc::new(
Expand All @@ -346,7 +403,7 @@ impl Connector {
);
let tls_connector = tokio_rustls::TlsConnector::from(tls_config);

let domain = webpki::types::ServerName::try_from(tls_host)
let domain = webpki::types::ServerName::try_from(server_addr.host())
.map_err(|err| ConnectError::with_source(crate::ConnectErrorKind::Tls, err))?;

let tls_stream = tls_connector
Expand All @@ -363,7 +420,7 @@ impl Connector {
// If `tls_first` was set, establish TLS connection before getting INFO.
// There is no point in checking if tls is required, because
// the connection has to be be upgraded to TLS anyway as it's different flow.
if self.options.tls_first {
if self.options.tls_first && !server_addr.is_websocket() {
connection = tls_connection(connection).await?;
}

Expand All @@ -386,6 +443,7 @@ impl Connector {

// If `tls_first` was not set, establish TLS connection if it is required.
if !self.options.tls_first
&& !server_addr.is_websocket()
&& (self.options.tls_required || info.tls_required || tls_required)
{
connection = tls_connection(connection).await?;
Expand Down
14 changes: 13 additions & 1 deletion async-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1455,7 +1455,11 @@ impl FromStr for ServerAddr {
impl ServerAddr {
/// Check if the URL is a valid NATS server address.
pub fn from_url(url: Url) -> io::Result<Self> {
if url.scheme() != "nats" && url.scheme() != "tls" {
if url.scheme() != "nats"
&& url.scheme() != "tls"
&& url.scheme() != "ws"
&& url.scheme() != "wss"
{
return Err(std::io::Error::new(
ErrorKind::InvalidInput,
format!("invalid scheme for NATS server URL: {}", url.scheme()),
Expand All @@ -1480,6 +1484,10 @@ impl ServerAddr {
self.0.username() != ""
}

pub fn scheme(&self) -> &str {
self.0.scheme()
}

/// Returns the host.
pub fn host(&self) -> &str {
match self.0.host() {
Expand All @@ -1493,6 +1501,10 @@ impl ServerAddr {
}
}

pub fn is_websocket(&self) -> bool {
self.0.scheme() == "ws" || self.0.scheme() == "wss"
}

/// Returns the port.
pub fn port(&self) -> u16 {
self.0.port().unwrap_or(4222)
Expand Down
5 changes: 5 additions & 0 deletions async-nats/tests/configs/ws.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
jetstream {}
websocket {
port: 8444
no_tls: true
}
15 changes: 15 additions & 0 deletions async-nats/tests/configs/ws_tls.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
authorization {
user: derek
password: porkchop
timeout: 1
}

websocket {
tls {

cert_file: "./tests/configs/certs/server-cert.pem"
key_file: "./tests/configs/certs/server-key.pem"
ca_file: "./tests/configs/certs/rootCA.pem"
}
port: 8445
}
Loading
Loading