From 26e1fdc909bc4faac84040ffdea46e3029f3507a Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Wed, 14 Jun 2023 11:44:18 +0200 Subject: [PATCH] Add custom auth callback Signed-off-by: Tomasz Pietrek --- .config/nats.dic | 1 + async-nats/src/auth.rs | 21 ++++++++++++------ async-nats/src/connector.rs | 25 ++++++++++++++++++++- async-nats/src/lib.rs | 2 ++ async-nats/src/options.rs | 38 ++++++++++++++++++++++++++++++-- async-nats/tests/client_tests.rs | 15 +++++++++++++ 6 files changed, 92 insertions(+), 10 deletions(-) diff --git a/.config/nats.dic b/.config/nats.dic index 6522a5376..bfe3705cc 100644 --- a/.config/nats.dic +++ b/.config/nats.dic @@ -135,3 +135,4 @@ RequestErrorKind rustls Acker EndpointSchema +auth diff --git a/async-nats/src/auth.rs b/async-nats/src/auth.rs index 95a49566e..e04e72455 100644 --- a/async-nats/src/auth.rs +++ b/async-nats/src/auth.rs @@ -1,11 +1,18 @@ use crate::{options::CallbackArg1, AuthError}; #[derive(Default)] -pub(crate) struct Auth { - pub(crate) jwt: Option, - pub(crate) nkey: Option, - pub(crate) signature: Option>>, - pub(crate) username: Option, - pub(crate) password: Option, - pub(crate) token: Option, +pub struct Auth { + pub jwt: Option, + pub nkey: Option, + pub(crate) signature_callback: Option>>, + pub signature: Option, + pub username: Option, + pub password: Option, + pub token: Option, +} + +impl Auth { + pub fn new() -> Auth { + Auth::default() + } } diff --git a/async-nats/src/connector.rs b/async-nats/src/connector.rs index e0c3b0e80..6a285d979 100644 --- a/async-nats/src/connector.rs +++ b/async-nats/src/connector.rs @@ -14,7 +14,9 @@ use crate::auth::Auth; use crate::connection::Connection; use crate::connection::State; +use crate::options::CallbackArg1; use crate::tls; +use crate::AuthError; use crate::ClientError; use crate::ClientOp; use crate::ConnectError; @@ -59,6 +61,7 @@ pub(crate) struct ConnectorOptions { pub(crate) retain_servers_order: bool, pub(crate) read_buffer_capacity: u16, pub(crate) reconnect_delay_callback: Box Duration + Send + Sync + 'static>, + pub(crate) auth_callback: Option, Result>>, } /// Maintains a list of servers and establishes connections. @@ -199,7 +202,7 @@ impl Connector { } if let Some(jwt) = self.options.auth.jwt.as_ref() { - if let Some(sign_fn) = self.options.auth.signature.as_ref() { + if let Some(sign_fn) = self.options.auth.signature_callback.as_ref() { match sign_fn.call(server_info.nonce.clone()).await { Ok(sig) => { connect_info.user_jwt = Some(jwt.clone()); @@ -214,6 +217,26 @@ impl Connector { } } + if let Some(callback) = self.options.auth_callback.as_ref() { + let auth = callback + .call(server_info.nonce.as_bytes().to_vec()) + .await + .map_err(|err| { + ConnectError::with_source( + crate::ConnectErrorKind::Authentication, + err, + ) + })?; + connect_info.user = auth.username; + connect_info.pass = auth.password; + connect_info.user_jwt = auth.jwt; + connect_info.signature = auth + .signature + .map(|signature| URL_SAFE_NO_PAD.encode(signature)); + connect_info.auth_token = auth.token; + connect_info.nkey = auth.nkey; + } + connection .write_op(&ClientOp::Connect(connect_info)) .await?; diff --git a/async-nats/src/lib.rs b/async-nats/src/lib.rs index be7b7303d..0b4157e68 100644 --- a/async-nats/src/lib.rs +++ b/async-nats/src/lib.rs @@ -149,6 +149,7 @@ pub mod connection; mod connector; mod options; +pub use auth::Auth; pub use client::{Client, PublishError, Request, RequestError, RequestErrorKind, SubscribeError}; pub use options::{AuthError, ConnectOptions}; @@ -670,6 +671,7 @@ pub async fn connect_with_options( retain_servers_order: options.retain_servers_order, read_buffer_capacity: options.read_buffer_capacity, reconnect_delay_callback: options.reconnect_delay_callback, + auth_callback: options.auth_callback, }, events_tx, state_tx, diff --git a/async-nats/src/options.rs b/async-nats/src/options.rs index f621edc2d..2741896dc 100644 --- a/async-nats/src/options.rs +++ b/async-nats/src/options.rs @@ -61,6 +61,7 @@ pub struct ConnectOptions { pub(crate) retain_servers_order: bool, pub(crate) read_buffer_capacity: u16, pub(crate) reconnect_delay_callback: Box Duration + Send + Sync + 'static>, + pub(crate) auth_callback: Option, Result>>, } impl fmt::Debug for ConnectOptions { @@ -120,6 +121,7 @@ impl Default for ConnectOptions { connector::reconnect_delay_callback_default(attempts) }), auth: Default::default(), + auth_callback: None, } } } @@ -175,6 +177,38 @@ impl ConnectOptions { crate::connect_with_options(addrs, self).await } + /// Creates a builder with a custom auth callback to be used when authenticating against the NATS Server. + /// Requires an asynchronous function that accepts nonce and returns [Auth]. + /// It will overwrite all other auth methods used. + /// + /// + /// # Example + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::ConnectError> { + /// async_nats::ConnectOptions::with_auth_callback(move |_| async move { + /// let mut auth = async_nats::Auth::new(); + /// auth.username = Some("derek".to_string()); + /// auth.password = Some("s3cr3t".to_string()); + /// Ok(auth) + /// }) + /// .connect("demo.nats.io") + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub fn with_auth_callback(callback: F) -> Self + where + F: Fn(Vec) -> Fut + Send + Sync + 'static, + Fut: Future> + 'static + Send + Sync, + { + let mut options = ConnectOptions::new(); + options.auth_callback = Some(CallbackArg1::, Result>(Box::new( + move |nonce| Box::pin(callback(nonce)), + ))); + options + } + /// Authenticate against NATS Server with the provided token. /// /// # Examples @@ -359,7 +393,7 @@ impl ConnectOptions { })); self.auth.jwt = Some(jwt); - self.auth.signature = Some(jwt_sign_callback); + self.auth.signature_callback = Some(jwt_sign_callback); self } @@ -866,7 +900,7 @@ impl ConnectOptions { } } -type AsyncCallbackArg1 = +pub(crate) type AsyncCallbackArg1 = Box Pin + Send + Sync + 'static>> + Send + Sync>; pub(crate) struct CallbackArg1(AsyncCallbackArg1); diff --git a/async-nats/tests/client_tests.rs b/async-nats/tests/client_tests.rs index d8a28122b..5505e21d0 100644 --- a/async-nats/tests/client_tests.rs +++ b/async-nats/tests/client_tests.rs @@ -852,4 +852,19 @@ mod client { tokio::time::sleep(std::time::Duration::from_secs(3)).await; } } + + #[tokio::test] + async fn custom_auth_callback() { + let server = nats_server::run_server("tests/configs/user_pass.conf"); + + ConnectOptions::with_auth_callback(move |_| async move { + let mut auth = async_nats::Auth::new(); + auth.username = Some("derek".to_string()); + auth.password = Some("s3cr3t".to_string()); + Ok(auth) + }) + .connect(server.client_url()) + .await + .unwrap(); + } }