diff --git a/sqld/src/auth.rs b/sqld/src/auth.rs index e190c17b..6d2bf50d 100644 --- a/sqld/src/auth.rs +++ b/sqld/src/auth.rs @@ -191,7 +191,7 @@ impl Authenticated { level ))) } - None => return Err(Status::invalid_argument("unable to convert to ascii")), + None => return Err(Status::invalid_argument("x-proxy-authorization not set")), }; Ok(auth) diff --git a/sqld/src/lib.rs b/sqld/src/lib.rs index 916da99c..7538d69e 100644 --- a/sqld/src/lib.rs +++ b/sqld/src/lib.rs @@ -339,7 +339,7 @@ where let idle_shutdown_kicker = self.setup_shutdown(); let snapshot_callback = self.make_snapshot_callback(); - let auth = self.user_api_config.get_auth()?.into(); + let auth = self.user_api_config.get_auth().map(Arc::new)?; let extensions = self.db_config.validate_extensions()?; match self.rpc_client_config { @@ -350,6 +350,7 @@ where extensions, db_config: self.db_config.clone(), base_path: self.path.clone(), + auth: auth.clone(), }; let (namespaces, proxy_service, replication_service) = replica.configure().await?; let services = Services { @@ -504,6 +505,7 @@ struct Replica { extensions: Arc<[PathBuf]>, db_config: DbConfig, base_path: Arc, + auth: Arc, } impl Replica { @@ -528,7 +530,7 @@ impl Replica { let factory = ReplicaNamespaceMaker::new(conf); let namespaces = NamespaceStore::new(factory, true); let replication_service = ReplicationLogProxyService::new(channel.clone(), uri.clone()); - let proxy_service = ReplicaProxyService::new(channel, uri); + let proxy_service = ReplicaProxyService::new(channel, uri, self.auth.clone()); Ok((namespaces, proxy_service, replication_service)) } diff --git a/sqld/src/rpc/replica_proxy.rs b/sqld/src/rpc/replica_proxy.rs index 59bed6db..c4aa7179 100644 --- a/sqld/src/rpc/replica_proxy.rs +++ b/sqld/src/rpc/replica_proxy.rs @@ -1,5 +1,9 @@ +use std::sync::Arc; + use hyper::Uri; -use tonic::transport::Channel; +use tonic::{transport::Channel, Request, Status}; + +use crate::auth::Auth; use super::proxy::rpc::{ self, proxy_client::ProxyClient, proxy_server::Proxy, Ack, DescribeRequest, DescribeResult, @@ -8,12 +12,21 @@ use super::proxy::rpc::{ pub struct ReplicaProxyService { client: ProxyClient, + auth: Arc, } impl ReplicaProxyService { - pub fn new(channel: Channel, uri: Uri) -> Self { + pub fn new(channel: Channel, uri: Uri, auth: Arc) -> Self { let client = ProxyClient::with_origin(channel, uri); - Self { client } + Self { client, auth } + } + + fn do_auth(&self, req: &mut Request) -> Result<(), Status> { + let authenticated = self.auth.authenticate_grpc(req, false)?; + + authenticated.upgrade_grpc_request(req); + + Ok(()) } } @@ -21,8 +34,10 @@ impl ReplicaProxyService { impl Proxy for ReplicaProxyService { async fn execute( &self, - req: tonic::Request, + mut req: tonic::Request, ) -> Result, tonic::Status> { + self.do_auth(&mut req)?; + let mut client = self.client.clone(); client.execute(req).await } @@ -30,16 +45,20 @@ impl Proxy for ReplicaProxyService { //TODO: also handle cleanup on peer disconnect async fn disconnect( &self, - msg: tonic::Request, + mut msg: tonic::Request, ) -> Result, tonic::Status> { + self.do_auth(&mut msg)?; + let mut client = self.client.clone(); client.disconnect(msg).await } async fn describe( &self, - req: tonic::Request, + mut req: tonic::Request, ) -> Result, tonic::Status> { + self.do_auth(&mut req)?; + let mut client = self.client.clone(); client.describe(req).await }