diff --git a/docs/src/transforms.md b/docs/src/transforms.md index 808f113d1..2aaffc998 100644 --- a/docs/src/transforms.md +++ b/docs/src/transforms.md @@ -309,9 +309,6 @@ This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster # When a timeout occurs the connection to the client is immediately closed. # read_timeout: 60 - # When this field is enabled it allows the use of SASL authentication. If you intend to use SASL this field must be enabled, it is false by default. - sasl_enabled: false - # When this field is provided TLS is used when connecting to the remote address. # Removing this field will disable TLS. #tls: diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 13f99c86b..145178f68 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -98,7 +98,6 @@ impl KafkaBench { broker_id: 0, }], tls: None, - sasl_enabled: Some(false), }), }); common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig { diff --git a/shotover-proxy/config/config.yaml b/shotover-proxy/config/config.yaml index 85830d979..025ab0007 100644 --- a/shotover-proxy/config/config.yaml +++ b/shotover-proxy/config/config.yaml @@ -1,5 +1,5 @@ # configure the first `info` to set the log level for dependencies # configure `shotover=info` to set the log level for shotover itself # set `shotover::connection_span=info` to `shotover::connection_span=debug` to attach connection info to most log events, this is disabled by default due to a minor performance hit. -main_log_level: "info, shotover=info, shotover::connection_span=info" +main_log_level: "debug, shotover=debug, shotover::connection_span=debug" observability_interface: "0.0.0.0:9001" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml index 8a59741ac..f2d8f4368 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml @@ -9,6 +9,5 @@ sources: - address: "127.0.0.1:9192" rack: "rack0" broker_id: 0 - sasl_enabled: true first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml index 7a6990015..15fe492e1 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml @@ -5,7 +5,6 @@ sources: listen_addr: "127.0.0.1:9191" chain: - KafkaSinkCluster: - sasl_enabled: true shotover_nodes: - address: "127.0.0.1:9191" rack: "rack0" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml index 1fa948922..3844a8486 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml @@ -5,7 +5,6 @@ sources: listen_addr: "127.0.0.1:9192" chain: - KafkaSinkCluster: - sasl_enabled: true shotover_nodes: - address: "127.0.0.1:9191" rack: "rack0" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml index 1cf09d057..0fa7eb760 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml @@ -5,7 +5,6 @@ sources: listen_addr: "127.0.0.1:9193" chain: - KafkaSinkCluster: - sasl_enabled: true shotover_nodes: - address: "127.0.0.1:9191" rack: "rack0" diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 7859231ad..138f58245 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -51,7 +51,6 @@ pub struct KafkaSinkClusterConfig { pub connect_timeout_ms: u64, pub read_timeout: Option, pub tls: Option, - pub sasl_enabled: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -99,7 +98,6 @@ impl TransformConfig for KafkaSinkClusterConfig { self.connect_timeout_ms, self.read_timeout, tls, - self.sasl_enabled.unwrap_or(false), ))) } } @@ -116,7 +114,6 @@ pub struct KafkaSinkClusterBuilder { topic_by_id: Arc>, nodes_shared: Arc>>, tls: Option, - sasl_enabled: bool, } impl KafkaSinkClusterBuilder { @@ -127,7 +124,6 @@ impl KafkaSinkClusterBuilder { connect_timeout_ms: u64, timeout: Option, tls: Option, - sasl_enabled: bool, ) -> KafkaSinkClusterBuilder { let receive_timeout = timeout.map(Duration::from_secs); @@ -148,7 +144,6 @@ impl KafkaSinkClusterBuilder { topic_by_id: Arc::new(DashMap::new()), nodes_shared: Arc::new(RwLock::new(vec![])), tls, - sasl_enabled, } } } @@ -166,7 +161,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { topic_by_name: self.topic_by_name.clone(), topic_by_id: self.topic_by_id.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), - sasl_status: SaslStatus::new(self.sasl_enabled), + sasl_status: SaslStatus::new(), connection_factory: ConnectionFactory::new(self.tls.clone(), self.connect_timeout), first_contact_node: None, fetch_session_id_to_broker: HashMap::new(), @@ -207,28 +202,22 @@ impl AtomicBrokerId { #[derive(Debug)] struct SaslStatus { - enabled: bool, - handshake_complete: bool, + auth_complete: bool, } impl SaslStatus { - fn new(enabled: bool) -> Self { + fn new() -> Self { Self { - enabled, - handshake_complete: false, + auth_complete: false, } } - fn set_handshake_complete(&mut self) { - self.handshake_complete = true; + fn set_auth_complete(&mut self, v: bool) { + self.auth_complete = v; } fn is_handshake_complete(&self) -> bool { - if self.enabled { - self.handshake_complete - } else { - true - } + self.auth_complete } } @@ -371,6 +360,10 @@ impl KafkaSinkCluster { } } + if !self.sasl_status.is_handshake_complete() { + self.requests_contain_non_handshake_message(&mut requests); + } + for group in groups { match self.find_coordinator_of_group(group.clone()).await { Ok(node) => { @@ -625,7 +618,6 @@ impl KafkaSinkCluster { .await?; self.connection_factory.add_auth_message(message.clone()); results.push(rx); - self.sasl_status.set_handshake_complete(); } // route to random node @@ -774,6 +766,29 @@ impl KafkaSinkCluster { Ok(rx.await.unwrap().response.unwrap()) } + fn requests_contain_non_handshake_message(&mut self, requests: &mut [Message]) { + for requests in requests.iter_mut() { + match requests.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ApiVersions(_), + .. + })) => {} + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::SaslHandshake(_), + .. + })) => {} + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::SaslAuthenticate(_), + .. + })) => {} + _ => { + self.sasl_status.set_auth_complete(true); + return; + } + } + } + } + async fn receive_responses( &mut self, find_coordinator_requests: &[FindCoordinator],