Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

sqld: Fix replica proxy auth #709

Merged
merged 1 commit into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sqld/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ impl Authenticated {
level
)))
}
None => return Err(Status::invalid_argument("unable to convert to ascii")),
None => return Err(Status::invalid_argument("x-proxy-authorization not set")),
};

Ok(auth)
Expand Down
6 changes: 4 additions & 2 deletions sqld/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ where
let idle_shutdown_kicker = self.setup_shutdown();

let snapshot_callback = self.make_snapshot_callback();
let auth = self.user_api_config.get_auth()?.into();
let auth = self.user_api_config.get_auth().map(Arc::new)?;
let extensions = self.db_config.validate_extensions()?;

match self.rpc_client_config {
Expand All @@ -350,6 +350,7 @@ where
extensions,
db_config: self.db_config.clone(),
base_path: self.path.clone(),
auth: auth.clone(),
};
let (namespaces, proxy_service, replication_service) = replica.configure().await?;
let services = Services {
Expand Down Expand Up @@ -504,6 +505,7 @@ struct Replica<C> {
extensions: Arc<[PathBuf]>,
db_config: DbConfig,
base_path: Arc<Path>,
auth: Arc<Auth>,
}

impl<C: Connector> Replica<C> {
Expand All @@ -528,7 +530,7 @@ impl<C: Connector> Replica<C> {
let factory = ReplicaNamespaceMaker::new(conf);
let namespaces = NamespaceStore::new(factory, true);
let replication_service = ReplicationLogProxyService::new(channel.clone(), uri.clone());
let proxy_service = ReplicaProxyService::new(channel, uri);
let proxy_service = ReplicaProxyService::new(channel, uri, self.auth.clone());

Ok((namespaces, proxy_service, replication_service))
}
Expand Down
31 changes: 25 additions & 6 deletions sqld/src/rpc/replica_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::sync::Arc;

use hyper::Uri;
use tonic::transport::Channel;
use tonic::{transport::Channel, Request, Status};

use crate::auth::Auth;

use super::proxy::rpc::{
self, proxy_client::ProxyClient, proxy_server::Proxy, Ack, DescribeRequest, DescribeResult,
Expand All @@ -8,38 +12,53 @@ use super::proxy::rpc::{

pub struct ReplicaProxyService {
client: ProxyClient<Channel>,
auth: Arc<Auth>,
}

impl ReplicaProxyService {
pub fn new(channel: Channel, uri: Uri) -> Self {
pub fn new(channel: Channel, uri: Uri, auth: Arc<Auth>) -> Self {
let client = ProxyClient::with_origin(channel, uri);
Self { client }
Self { client, auth }
}

fn do_auth<T>(&self, req: &mut Request<T>) -> Result<(), Status> {
let authenticated = self.auth.authenticate_grpc(req, false)?;

authenticated.upgrade_grpc_request(req);

Ok(())
}
}

#[tonic::async_trait]
impl Proxy for ReplicaProxyService {
async fn execute(
&self,
req: tonic::Request<rpc::ProgramReq>,
mut req: tonic::Request<rpc::ProgramReq>,
) -> Result<tonic::Response<ExecuteResults>, tonic::Status> {
self.do_auth(&mut req)?;

let mut client = self.client.clone();
client.execute(req).await
}

//TODO: also handle cleanup on peer disconnect
async fn disconnect(
&self,
msg: tonic::Request<DisconnectMessage>,
mut msg: tonic::Request<DisconnectMessage>,
) -> Result<tonic::Response<Ack>, tonic::Status> {
self.do_auth(&mut msg)?;

let mut client = self.client.clone();
client.disconnect(msg).await
}

async fn describe(
&self,
req: tonic::Request<DescribeRequest>,
mut req: tonic::Request<DescribeRequest>,
) -> Result<tonic::Response<DescribeResult>, tonic::Status> {
self.do_auth(&mut req)?;

let mut client = self.client.clone();
client.describe(req).await
}
Expand Down