From 03552b0340efec04f674bf73fce50bd8da896c0b Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 2 May 2024 09:39:04 +1000 Subject: [PATCH] KafkaSinkCluster: authorize_scram_over_mtls --- Cargo.lock | 25 ++ .../benches/windsock/kafka/bench.rs | 1 + shotover-proxy/tests/kafka_int_tests/mod.rs | 28 +++ .../docker-compose.yaml | 70 ++++++ .../topology-single.yaml | 26 +++ shotover/Cargo.toml | 4 + .../src/transforms/kafka/sink_cluster/mod.rs | 155 +++++++++++-- .../src/transforms/kafka/sink_cluster/node.rs | 213 +++++++++++++++--- .../kafka/sink_cluster/scram_over_mtls.rs | 132 +++++++++++ 9 files changed, 604 insertions(+), 50 deletions(-) create mode 100644 shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml create mode 100644 shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml create mode 100644 shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs diff --git a/Cargo.lock b/Cargo.lock index 0df1760de..146e4d5c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4277,6 +4277,19 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "sasl" +version = "0.5.1" +source = "git+https://gitlab.com/rukai/xmpp-rs?branch=sasl_scram_extensions#a7e8b2ad648d7ce606e9c4de0133739f050bfa45" +dependencies = [ + "base64 0.22.1", + "getrandom 0.2.14", + "hmac", + "pbkdf2 0.12.2", + "sha-1", + "sha2", +] + [[package]] name = "schannel" version = "0.1.23" @@ -4498,6 +4511,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha-1" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha1" version = "0.10.6" @@ -4606,6 +4630,7 @@ dependencies = [ "rustls 0.23.5", "rustls-pemfile 2.1.2", "rustls-pki-types", + "sasl", "serde", "serde_json", "serde_yaml", diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index d81d8b58b..7d93e84a2 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -97,6 +97,7 @@ impl KafkaBench { rack: "rack1".into(), broker_id: 0, }], + authorize_scram_over_mtls: None, tls: None, }), }); diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 17f3547a1..b0815dc9e 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -346,6 +346,34 @@ async fn cluster_sasl_scram_single_shotover(#[case] driver: KafkaDriver) { .expect("Shotover did not shutdown within 10s"); } +#[rstest] +//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear +async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDriver) { + test_helpers::cert::generate_kafka_test_certs(); + + let _docker_compose = + docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml"); + + let shotover = shotover_process( + "tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml", + ) + .start() + .await; + + let connection_builder = + KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_scram("user", "password"); + test_cases::standard_test_suite(connection_builder).await; + + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); +} + #[cfg(feature = "kafka-cpp-driver-tests")] // temporarily needed to avoid a warning #[rstest] #[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml new file mode 100644 index 000000000..808cc9276 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml @@ -0,0 +1,70 @@ +networks: + cluster_subnet: + name: cluster_subnet + driver: bridge + ipam: + driver: default + config: + - subnet: 172.16.1.0/24 + gateway: 172.16.1.1 + +services: + kafka0: + image: &image 'bitnami/kafka:3.6.1-debian-11-r24' + networks: + cluster_subnet: + ipv4_address: 172.16.1.2 + environment: &environment + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: "controller,broker" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093" + KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093,SHOTOVER_MTLS://:9094" + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:SSL,BROKER:SASL_SSL,SHOTOVER_MTLS:SSL" + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092,SHOTOVER_MTLS://172.16.1.2:9094" + KAFKA_CFG_DELEGATION_TOKEN_MASTER_KEY: THE_MASTER_KEY + KAFKA_CLIENT_USERS: "user" + KAFKA_CLIENT_PASSWORDS: "password" + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL: "PLAIN" + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER" + KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "SCRAM-SHA-256" + KAFKA_INTER_BROKER_USER: "controller_user" + KAFKA_INTER_BROKER_PASSWORD: "controller_password" + KAFKA_CERTIFICATE_PASSWORD: "password" + KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv" + # TODO: Does this enable MTLS auth for the SCRAM connection as well? + # If so, we need to be careful that kafka doesnt elevate our token'd SCRAM credentials to super user. + # I dont yet know how it would actually behave. + # If so, a possible solution could be seperate certificates for the token'd connections + KAFKA_TLS_CLIENT_AUTH: required + KAFKA_CFG_AUTHORIZER_CLASS_NAME: "org.apache.kafka.metadata.authorizer.StandardAuthorizer" + # Give the following super user access: + # * the user named `user` + # * any clients connected via a TLS certificate of `O=ShotoverTestCertificate,CN=Generic-Cert` + KAFKA_CFG_SUPER_USERS: "User:user;User:O=ShotoverTestCertificate,CN=Generic-Cert" + volumes: &volumes + - type: tmpfs + target: /bitnami/kafka + - type: bind + source: "../tls/certs" + target: "/opt/bitnami/kafka/config/certs" + kafka1: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.3 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092,SHOTOVER_MTLS://172.16.1.3:9094" + KAFKA_CFG_NODE_ID: 1 + volumes: *volumes + kafka2: + image: *image + networks: + cluster_subnet: + ipv4_address: 172.16.1.4 + environment: + <<: *environment + KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092,SHOTOVER_MTLS://172.16.1.4:9094" + KAFKA_CFG_NODE_ID: 2 + volumes: *volumes diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml new file mode 100644 index 000000000..9c4665a80 --- /dev/null +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml @@ -0,0 +1,26 @@ +--- +sources: + - Kafka: + name: "kafka" + listen_addr: "127.0.0.1:9192" + chain: + - DebugPrinter + - KafkaSinkCluster: + shotover_nodes: + - address: "127.0.0.1:9192" + rack: "rack0" + broker_id: 0 + first_contact_points: ["172.16.1.2:9092"] + authorize_scram_over_mtls: + mtls_port_contact_points: ["172.16.1.2:9094"] + tls: + certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" + certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" + private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key" + verify_hostname: true + connect_timeout_ms: 3000 + tls: + certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" + certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" + private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key" + verify_hostname: true diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 3be72d920..dba8c62a1 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -33,6 +33,8 @@ kafka = [ "dep:dashmap", "dep:xxhash-rust", "dep:string", + "dep:base64", + "dep:sasl", ] redis = [ "dep:redis-protocol", @@ -119,6 +121,8 @@ xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true } dashmap = { version = "5.4.0", optional = true } atoi = { version = "2.0.0", optional = true } fnv = "1.0.7" +#sasl = { version = "0.5.1", optional = true, default-features = false, features = ["scram"] } +sasl = { version = "0.5.1", optional = true, default-features = false, features = ["scram"], git = "https://gitlab.com/rukai/xmpp-rs", branch = "sasl_scram_extensions" } [dev-dependencies] criterion = { version = "0.5.0", features = ["async_tokio"] } diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index b500a4612..5d9b6c220 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -17,13 +17,18 @@ use kafka_protocol::messages::metadata_response::MetadataResponseBroker; use kafka_protocol::messages::{ ApiKey, BrokerId, FetchRequest, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, JoinGroupRequest, MetadataRequest, MetadataResponse, OffsetFetchRequest, - RequestHeader, SaslHandshakeRequest, SyncGroupRequest, TopicName, + RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, + SyncGroupRequest, TopicName, }; use kafka_protocol::protocol::{Builder, StrBytes}; use node::{ConnectionFactory, KafkaAddress, KafkaNode}; use rand::rngs::SmallRng; use rand::seq::{IteratorRandom, SliceRandom}; use rand::SeedableRng; +use scram_over_mtls::{ + create_delegation_token_for_user, AuthorizeScramOverMtls, AuthorizeScramOverMtlsBuilder, + AuthorizeScramOverMtlsConfig, OriginalScramState, +}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; use std::hash::Hasher; @@ -34,6 +39,9 @@ use tokio::sync::RwLock; use uuid::Uuid; mod node; +mod scram_over_mtls; + +const SASL_SCRAM_MECHANISMS: [&str; 2] = ["SCRAM-SHA-256", "SCRAM-SHA-512"]; #[derive(thiserror::Error, Debug)] enum FindCoordinatorError { @@ -51,6 +59,7 @@ pub struct KafkaSinkClusterConfig { pub connect_timeout_ms: u64, pub read_timeout: Option, pub tls: Option, + pub authorize_scram_over_mtls: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -99,6 +108,10 @@ impl TransformConfig for KafkaSinkClusterConfig { Ok(Box::new(KafkaSinkClusterBuilder::new( self.first_contact_points.clone(), + self.authorize_scram_over_mtls + .as_ref() + .map(AuthorizeScramOverMtlsConfig::get_builder) + .transpose()?, shotover_nodes, transform_context.chain_name, self.connect_timeout_ms, @@ -127,12 +140,15 @@ pub struct KafkaSinkClusterBuilder { topic_by_name: Arc>, topic_by_id: Arc>, nodes_shared: Arc>>, + authorize_scram_over_mtls: Option, + tls: Option, } impl KafkaSinkClusterBuilder { pub fn new( first_contact_points: Vec, + authorize_scram_over_mtls: Option, shotover_nodes: Vec, _chain_name: String, connect_timeout_ms: u64, @@ -143,6 +159,7 @@ impl KafkaSinkClusterBuilder { KafkaSinkClusterBuilder { first_contact_points, + authorize_scram_over_mtls, shotover_nodes, connect_timeout: Duration::from_millis(connect_timeout_ms), read_timeout: receive_timeout, @@ -181,6 +198,10 @@ impl TransformBuilder for KafkaSinkClusterBuilder { find_coordinator_requests: Default::default(), temp_responses_buffer: Default::default(), sasl_mechanism: None, + authorize_scram_over_mtls: self + .authorize_scram_over_mtls + .as_ref() + .map(|x| x.build(self.connect_timeout, self.read_timeout)), }) } @@ -235,7 +256,8 @@ pub struct KafkaSinkCluster { find_coordinator_requests: MessageIdMap, /// A temporary buffer used when receiving responses, only held onto in order to avoid reallocating. temp_responses_buffer: Vec, - sasl_mechanism: Option, + sasl_mechanism: Option, + authorize_scram_over_mtls: Option, } /// State of a Request/Response is maintained by this enum. @@ -330,7 +352,11 @@ impl KafkaSinkCluster { let address = &self.nodes.choose(&mut self.rng).unwrap().kafka_address; self.control_connection = Some( self.connection_factory - .create_connection(address) + .create_connection( + address, + &self.authorize_scram_over_mtls, + &self.sasl_mechanism, + ) .await .context("Failed to create control connection")?, ); @@ -379,14 +405,22 @@ impl KafkaSinkCluster { })) => { mechanism.as_str(); - self.sasl_mechanism = Some(mechanism.clone()); + self.sasl_mechanism = Some(mechanism.as_str().to_owned()); self.connection_factory.add_auth_request(request.clone()); handshake_request_count += 1; } Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::SaslAuthenticate(_), + body: + RequestBody::SaslAuthenticate(SaslAuthenticateRequest { + auth_bytes, .. + }), .. })) => { + if let Some(scram_over_mtls) = &mut self.authorize_scram_over_mtls { + if let Some(username) = get_username_from_scram_request(auth_bytes) { + scram_over_mtls.username = username; + } + } self.connection_factory.add_auth_request(request.clone()); handshake_request_count += 1; } @@ -397,7 +431,26 @@ impl KafkaSinkCluster { handshake_request_count += 1; } _ => { - // The client is no longer performing authentication + // The client is no longer performing authentication, so consider auth completed + + if let Some(scram_over_mtls) = &self.authorize_scram_over_mtls { + // When performing SCRAM over mTLS, we need this security check to ensure that the + // client cannot access delegation tokens that it has not succesfully authenticated for. + // + // If the client were to send a request directly after the SCRAM requests, + // without waiting for responses to those scram requests first, + // this error would be triggered even if the SCRAM requests were succesful. + // However that would be a violation of the SCRAM protocol as the client is supposed to check + // the server's signature contained in the server's final message in order to authenticate the server. + // So I dont think this problem is worth solving. + if !matches!( + scram_over_mtls.original_scram_state, + OriginalScramState::AuthSuccess + ) { + return Err(anyhow!("Client attempted to send requests before a succesful auth was completed or after an unsuccesful auth")); + } + } + self.auth_complete = true; break; } @@ -876,7 +929,11 @@ impl KafkaSinkCluster { .iter_mut() .find(|x| x.broker_id == destination) .unwrap() - .get_connection(&self.connection_factory) + .get_connection( + &self.connection_factory, + &self.authorize_scram_over_mtls, + &self.sasl_mechanism, + ) .await? .send(requests.requests)?; } @@ -1034,20 +1091,30 @@ impl KafkaSinkCluster { body: ResponseBody::SaslHandshake(handshake), .. })) => { - // always remove scram from supported mechanisms - const SASL_SCRAM_MECHANISMS: [&str; 2] = ["SCRAM-SHA-256", "SCRAM-SHA-512"]; - handshake - .mechanisms - .retain(|x| !SASL_SCRAM_MECHANISMS.contains(&x.as_str())); - - // declare unsupported if the client requested SCRAM - if let Some(sasl_mechanism) = &self.sasl_mechanism { - if SASL_SCRAM_MECHANISMS.contains(&sasl_mechanism.as_str()) { - handshake.error_code = 33; // UNSUPPORTED_SASL_MECHANISM + // If authorize_scram_over_mtls is disabled there is no way that scram can work through KafkaSinkCluster + // since it is specifically designed such that replay attacks wont work. + // So when authorize_scram_over_mtls is disabled report to the user that SCRAM is not enabled. + if self.authorize_scram_over_mtls.is_none() { + // remove scram from supported mechanisms + handshake + .mechanisms + .retain(|x| !SASL_SCRAM_MECHANISMS.contains(&x.as_str())); + + // declare unsupported if the client requested SCRAM + if let Some(sasl_mechanism) = &self.sasl_mechanism { + if SASL_SCRAM_MECHANISMS.contains(&sasl_mechanism.as_str()) { + handshake.error_code = 33; // UNSUPPORTED_SASL_MECHANISM + } } - } - response.invalidate_cache(); + response.invalidate_cache(); + } + } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::SaslAuthenticate(authenticate), + .. + })) => { + self.process_sasl_authenticate(authenticate).await?; } Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Metadata(metadata), @@ -1081,6 +1148,44 @@ impl KafkaSinkCluster { Ok(()) } + async fn process_sasl_authenticate( + &mut self, + authenticate: &mut SaslAuthenticateResponse, + ) -> Result<()> { + if let Some(sasl_mechanism) = &self.sasl_mechanism { + if SASL_SCRAM_MECHANISMS.contains(&sasl_mechanism.as_str()) { + if let Some(scram_over_mtls) = &mut self.authorize_scram_over_mtls { + // Create the token as soon as we have the username we need, + // to minimize the wait required before we can actually use the token. + match scram_over_mtls.original_scram_state { + OriginalScramState::WaitingOnServerFirst => { + scram_over_mtls.delegation_token = create_delegation_token_for_user( + scram_over_mtls, + scram_over_mtls.username.clone(), + ) + .await?; + + scram_over_mtls.original_scram_state = if authenticate.error_code == 0 { + OriginalScramState::WaitingOnServerFinal + } else { + OriginalScramState::AuthFailed + }; + } + OriginalScramState::WaitingOnServerFinal => { + scram_over_mtls.original_scram_state = if authenticate.error_code == 0 { + OriginalScramState::AuthSuccess + } else { + OriginalScramState::AuthFailed + }; + } + _ => {} + } + } + } + } + Ok(()) + } + fn route_to_first_contact_node(&mut self, request: Message) { let destination = if let Some(first_contact_node) = &self.first_contact_node { self.nodes @@ -1415,3 +1520,15 @@ struct FindCoordinator { key: StrBytes, key_type: i8, } + +fn get_username_from_scram_request(auth_request: &[u8]) -> Option { + for s in std::str::from_utf8(auth_request).ok()?.split(',') { + let mut iter = s.splitn(2, '='); + if let (Some(key), Some(value)) = (iter.next(), iter.next()) { + if key == "n" { + return Some(value.to_owned()); + } + } + } + None +} diff --git a/shotover/src/transforms/kafka/sink_cluster/node.rs b/shotover/src/transforms/kafka/sink_cluster/node.rs index 913b167af..8c4e82055 100644 --- a/shotover/src/transforms/kafka/sink_cluster/node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/node.rs @@ -1,12 +1,21 @@ +use super::scram_over_mtls::AuthorizeScramOverMtls; use crate::codec::{kafka::KafkaCodecBuilder, CodecBuilder, Direction}; use crate::connection::SinkConnection; -use crate::frame::kafka::{KafkaFrame, ResponseBody}; +use crate::frame::kafka::{KafkaFrame, RequestBody, ResponseBody}; use crate::frame::Frame; use crate::message::Message; use crate::tls::TlsConnector; +use crate::transforms::kafka::sink_cluster::SASL_SCRAM_MECHANISMS; use anyhow::{anyhow, Context, Result}; -use kafka_protocol::messages::BrokerId; -use kafka_protocol::protocol::StrBytes; +use base64::engine::general_purpose; +use base64::Engine; +use bytes::Bytes; +use kafka_protocol::messages::{ApiKey, BrokerId, RequestHeader, SaslAuthenticateRequest}; +use kafka_protocol::protocol::{Builder, StrBytes}; +use sasl::client::mechanisms::Scram; +use sasl::client::Mechanism; +use sasl::common::scram::Sha256; +use sasl::common::ChannelBinding; use std::sync::Arc; use std::time::Duration; use tokio::sync::Notify; @@ -39,7 +48,30 @@ impl ConnectionFactory { self.auth_requests.push(message); } - pub async fn create_connection(&self, kafka_address: &KafkaAddress) -> Result { + pub async fn create_connection_unauthed( + &self, + kafka_address: &KafkaAddress, + ) -> Result { + let codec = KafkaCodecBuilder::new(Direction::Sink, "KafkaSinkCluster".to_owned()); + let address = (kafka_address.host.to_string(), kafka_address.port as u16); + + SinkConnection::new( + address, + codec, + &self.tls, + self.connect_timeout, + self.force_run_chain.clone(), + self.read_timeout, + ) + .await + } + + pub async fn create_connection( + &self, + kafka_address: &KafkaAddress, + authorize_scram_over_mtls: &Option, + sasl_mechanism: &Option, + ) -> Result { let codec = KafkaCodecBuilder::new(Direction::Sink, "KafkaSinkCluster".to_owned()); let address = (kafka_address.host.to_string(), kafka_address.port as u16); let mut connection = SinkConnection::new( @@ -53,39 +85,152 @@ impl ConnectionFactory { .await?; if !self.auth_requests.is_empty() { - connection.send(self.auth_requests.clone())?; - let mut received_count = 0; - let mut received = vec![]; - while received_count < self.auth_requests.len() { - received = connection.recv().await?; - received_count += received.len(); - } - - // Check that the authenticate response was a success - let mut response = received.pop().unwrap(); - if let Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::SaslAuthenticate(auth_response), - .. - })) = response.frame() - { - if auth_response.error_code != 0 { - return Err(anyhow!( - "Replayed auth failed, error code: {}, {}", - auth_response.error_code, - auth_response - .error_message - .as_ref() - .map(|x| x.to_string()) - .unwrap_or_default() - )); + if let Some(scram_over_mtls) = authorize_scram_over_mtls { + if let Some(sasl_mechanism) = sasl_mechanism { + if SASL_SCRAM_MECHANISMS.contains(&sasl_mechanism.as_str()) { + self.perform_tokenauth_scram_exchange(scram_over_mtls, &mut connection) + .await?; + } else { + self.replay_sasl(&mut connection).await?; + } + } else { + self.replay_sasl(&mut connection).await?; } } else { - return Err(anyhow!("Unexpected response to replayed auth {response:?}")); + self.replay_sasl(&mut connection).await?; } } Ok(connection) } + + /// authorize_scram_over_mtls creates new connections via delegation tokens. + /// Kafka implements delegation tokens as just a special case of SCRAM. + /// In particular kafka utilizes scram's concept of extensions to send `tokenauth=true` to the server, + /// indicating that the user and password are actually a delegation token's token_id and HMAC. + /// + /// This method implements a full SCRAM authentication exchange with the kafka delegation token extension. + /// The `client-first`, `server-first` etc messages are as named in the SCRAM RFC: + /// https://datatracker.ietf.org/doc/html/rfc5802 + async fn perform_tokenauth_scram_exchange( + &self, + scram_over_mtls: &AuthorizeScramOverMtls, + connection: &mut SinkConnection, + ) -> Result<()> { + // TODO: This sleep is currently load bearing... + // Need to delay progression until token has propagated. + tokio::time::sleep(std::time::Duration::from_secs(4)).await; + + let mut auth_requests = self.auth_requests.clone(); + + // send/receive SaslHandshake + connection.send(vec![auth_requests.remove(0)])?; + let mut handshake_response = connection.recv().await?.pop().unwrap(); + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::SaslHandshake(handshake_response), + .. + })) = handshake_response.frame() + { + if handshake_response.error_code != 0 { + return Err(anyhow!( + "SaslHandshake response reported an error, error code: {}, server supported mechanisms: {:?}", + handshake_response.error_code, + handshake_response.mechanisms, + )); + } + } else { + return Err(anyhow!( + "Unexpected response to SaslHandshake request: {handshake_response:?}" + )); + } + + // SCRAM client-first + let hmac = general_purpose::STANDARD.encode(&scram_over_mtls.delegation_token.hmac); + let mut scram = Scram::::new( + scram_over_mtls.delegation_token.token_id.clone(), + hmac, + ChannelBinding::None, + "tokenauth=true".to_owned(), + String::new(), + ) + .map_err(|x| anyhow!("{x:?}"))?; + connection.send(vec![Self::create_auth_request(scram.initial())?])?; + + // SCRAM server-first + let first_scram_response = connection.recv().await?.pop().unwrap(); + let first_scram_response = Self::process_auth_response(first_scram_response) + .context("first response to delegation token SCRAM reported an error")?; + + // SCRAM client-final + let final_scram_request = scram.response(&first_scram_response)?; + connection.send(vec![Self::create_auth_request(final_scram_request)?])?; + + // SCRAM server-final + let final_scram_response = connection.recv().await?.pop().unwrap(); + let final_scram_response = Self::process_auth_response(final_scram_response) + .context("final response to delegation token SCRAM reported an error")?; + scram + .success(&final_scram_response) + .context("Server gave invalid final response to delegation token SCRAM") + } + + /// For SASL plain it is sufficient to replay the requests made in a succesful exchange. + /// This method performs such a replay on the passed connection. + async fn replay_sasl(&self, connection: &mut SinkConnection) -> Result<()> { + connection.send(self.auth_requests.clone())?; + let mut received_count = 0; + let mut received = vec![]; + while received_count < self.auth_requests.len() { + connection.recv_into(&mut received).await?; + received_count += received.len(); + } + + // Check that the authenticate response was a success + Self::process_auth_response(received.pop().unwrap()) + .map(|_| ()) + .context("Unexpected response to replayed SASL requests") + } + + fn create_auth_request(bytes: Vec) -> Result { + Ok(Message::from_frame(Frame::Kafka(KafkaFrame::Request { + header: RequestHeader::builder() + .request_api_key(ApiKey::SaslAuthenticateKey as i16) + .request_api_version(2) + .build() + .unwrap(), + body: RequestBody::SaslAuthenticate( + SaslAuthenticateRequest::builder() + .auth_bytes(bytes.into()) + .build()?, + ), + }))) + } + + fn process_auth_response(mut response: Message) -> Result { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::SaslAuthenticate(auth_response), + .. + })) = response.frame() + { + if auth_response.error_code != 0 { + Err(anyhow!( + "SaslAuthenticate response reported an error, error code: {}, {}", + auth_response.error_code, + auth_response + .error_message + .as_ref() + .map(|x| x.as_str()) + .unwrap_or_default() + )) + } else { + Ok(auth_response.auth_bytes.clone()) + } + } else { + Err(anyhow!( + "Unexpected response to SaslAuthenticate request: {response:?}" + )) + } + } } #[derive(Clone, PartialEq, Debug)] @@ -148,11 +293,17 @@ impl KafkaNode { pub async fn get_connection( &mut self, connection_factory: &ConnectionFactory, + authorize_scram_over_mtls: &Option, + sasl_mechanism: &Option, ) -> Result<&mut SinkConnection> { if self.connection.is_none() { self.connection = Some( connection_factory - .create_connection(&self.kafka_address) + .create_connection( + &self.kafka_address, + authorize_scram_over_mtls, + sasl_mechanism, + ) .await .context("Failed to create a new connection")?, ); diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs new file mode 100644 index 000000000..b4c90d9fc --- /dev/null +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs @@ -0,0 +1,132 @@ +use super::node::{ConnectionFactory, KafkaAddress}; +use crate::{ + frame::{ + kafka::{KafkaFrame, RequestBody, ResponseBody}, + Frame, + }, + message::Message, + tls::{TlsConnector, TlsConnectorConfig}, +}; +use anyhow::{anyhow, Result}; +use kafka_protocol::{ + messages::{ApiKey, CreateDelegationTokenRequest, RequestHeader}, + protocol::{Builder, StrBytes}, +}; +use serde::{Deserialize, Serialize}; +use std::{sync::Arc, time::Duration}; +use tokio::sync::Notify; + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct AuthorizeScramOverMtlsConfig { + pub mtls_port_contact_points: Vec, + pub tls: TlsConnectorConfig, +} + +impl AuthorizeScramOverMtlsConfig { + pub fn get_builder(&self) -> Result { + Ok(AuthorizeScramOverMtlsBuilder { + mtls_port_contact_points: self.mtls_port_contact_points.clone(), + tls: TlsConnector::new(self.tls.clone())?, + }) + } +} + +pub struct AuthorizeScramOverMtlsBuilder { + pub mtls_port_contact_points: Vec, + pub tls: TlsConnector, +} + +impl AuthorizeScramOverMtlsBuilder { + pub fn build( + &self, + connect_timeout: Duration, + read_timeout: Option, + ) -> AuthorizeScramOverMtls { + let mtls_connection_factory = ConnectionFactory::new( + Some(self.tls.clone()), + connect_timeout, + read_timeout, + Arc::new(Notify::new()), + ); + AuthorizeScramOverMtls { + mtls_port_contact_points: self.mtls_port_contact_points.clone(), + mtls_connection_factory, + username: String::new(), + delegation_token: DelegationToken { + token_id: String::new(), + hmac: vec![], + }, + original_scram_state: OriginalScramState::WaitingOnServerFirst, + } + } +} + +pub struct AuthorizeScramOverMtls { + /// The destination to create mtls_connection from + pub mtls_port_contact_points: Vec, + /// connection factory to create mTLS connections from + pub mtls_connection_factory: ConnectionFactory, + /// Tracks the state of the original scram connections responses created from the clients actual requests + pub original_scram_state: OriginalScramState, + /// The username used by the original SCRAM auth + pub username: String, + /// The delegation token generated from the username used in the original scram auth + pub delegation_token: DelegationToken, +} + +pub enum OriginalScramState { + WaitingOnServerFirst, + WaitingOnServerFinal, + AuthFailed, + AuthSuccess, +} + +pub async fn create_delegation_token_for_user( + scram_over_mtls: &AuthorizeScramOverMtls, + username: String, +) -> Result { + let address = KafkaAddress::from_str(&scram_over_mtls.mtls_port_contact_points[0])?; + let mut connection = scram_over_mtls + .mtls_connection_factory + // Must be unauthed since mTLS is its own auth. + .create_connection_unauthed(&address) + .await?; + + connection.send(vec![Message::from_frame(Frame::Kafka( + KafkaFrame::Request { + header: RequestHeader::builder() + .request_api_key(ApiKey::CreateDelegationTokenKey as i16) + .request_api_version(3) + .build() + .unwrap(), + body: RequestBody::CreateDelegationToken( + CreateDelegationTokenRequest::builder() + .owner_principal_type(Some(StrBytes::from_static_str("User"))) + .owner_principal_name(Some(StrBytes::from_string(username))) + .build() + .unwrap(), + ), + }, + ))])?; + let mut response = connection.recv().await?.pop().unwrap(); + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::CreateDelegationToken(response), + .. + })) = response.frame() + { + Ok(DelegationToken { + token_id: response.token_id.as_str().to_owned(), + hmac: response.hmac.to_vec(), + }) + } else { + Err(anyhow!( + "Unexpected response to CreateDelegationToken {response:?}" + )) + } +} + +pub struct DelegationToken { + pub token_id: String, + pub hmac: Vec, +}