diff --git a/docs/src/transforms.md b/docs/src/transforms.md index 808f113d1..2aaffc998 100644 --- a/docs/src/transforms.md +++ b/docs/src/transforms.md @@ -309,9 +309,6 @@ This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster # When a timeout occurs the connection to the client is immediately closed. # read_timeout: 60 - # When this field is enabled it allows the use of SASL authentication. If you intend to use SASL this field must be enabled, it is false by default. - sasl_enabled: false - # When this field is provided TLS is used when connecting to the remote address. # Removing this field will disable TLS. #tls: diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 08e5e931f..d81d8b58b 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -98,7 +98,6 @@ impl KafkaBench { broker_id: 0, }], tls: None, - sasl_enabled: Some(false), }), }); common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig { diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml index 8a59741ac..f2d8f4368 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml @@ -9,6 +9,5 @@ sources: - address: "127.0.0.1:9192" rack: "rack0" broker_id: 0 - sasl_enabled: true first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml index 7a6990015..15fe492e1 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml @@ -5,7 +5,6 @@ sources: listen_addr: "127.0.0.1:9191" chain: - KafkaSinkCluster: - sasl_enabled: true shotover_nodes: - address: "127.0.0.1:9191" rack: "rack0" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml index 1fa948922..3844a8486 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml @@ -5,7 +5,6 @@ sources: listen_addr: "127.0.0.1:9192" chain: - KafkaSinkCluster: - sasl_enabled: true shotover_nodes: - address: "127.0.0.1:9191" rack: "rack0" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml index 1cf09d057..0fa7eb760 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml @@ -5,7 +5,6 @@ sources: listen_addr: "127.0.0.1:9193" chain: - KafkaSinkCluster: - sasl_enabled: true shotover_nodes: - address: "127.0.0.1:9191" rack: "rack0" diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 76a2c9ac1..0016473b6 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -49,7 +49,6 @@ pub struct KafkaSinkClusterConfig { pub connect_timeout_ms: u64, pub read_timeout: Option, pub tls: Option, - pub sasl_enabled: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -97,7 +96,6 @@ impl TransformConfig for KafkaSinkClusterConfig { self.connect_timeout_ms, self.read_timeout, tls, - self.sasl_enabled.unwrap_or(false), ))) } } @@ -114,7 +112,6 @@ pub struct KafkaSinkClusterBuilder { topic_by_id: Arc>, nodes_shared: Arc>>, tls: Option, - sasl_enabled: bool, } impl KafkaSinkClusterBuilder { @@ -125,7 +122,6 @@ impl KafkaSinkClusterBuilder { connect_timeout_ms: u64, timeout: Option, tls: Option, - sasl_enabled: bool, ) -> KafkaSinkClusterBuilder { let receive_timeout = timeout.map(Duration::from_secs); @@ -146,7 +142,6 @@ impl KafkaSinkClusterBuilder { topic_by_id: Arc::new(DashMap::new()), nodes_shared: Arc::new(RwLock::new(vec![])), tls, - sasl_enabled, } } } @@ -163,7 +158,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { topic_by_name: self.topic_by_name.clone(), topic_by_id: self.topic_by_id.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), - sasl_status: SaslStatus::new(self.sasl_enabled), + auth_complete: false, connection_factory: ConnectionFactory::new( self.tls.clone(), self.connect_timeout, @@ -210,33 +205,6 @@ impl AtomicBrokerId { } } -#[derive(Debug)] -struct SaslStatus { - enabled: bool, - handshake_complete: bool, -} - -impl SaslStatus { - fn new(enabled: bool) -> Self { - Self { - enabled, - handshake_complete: false, - } - } - - fn set_handshake_complete(&mut self) { - self.handshake_complete = true; - } - - fn is_handshake_complete(&self) -> bool { - if self.enabled { - self.handshake_complete - } else { - true - } - } -} - pub struct KafkaSinkCluster { first_contact_points: Vec, shotover_nodes: Vec, @@ -247,7 +215,7 @@ pub struct KafkaSinkCluster { topic_by_name: Arc>, topic_by_id: Arc>, rng: SmallRng, - sasl_status: SaslStatus, + auth_complete: bool, connection_factory: ConnectionFactory, first_contact_node: Option, control_connection: Option, @@ -382,6 +350,54 @@ impl KafkaSinkCluster { } async fn route_requests(&mut self, mut requests: Vec) -> Result<()> { + if !self.auth_complete { + let mut handshake_request_count = 0; + for request in &mut requests { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::SaslHandshake(_), + .. + })) => { + self.connection_factory + .add_handshake_message(request.clone()); + handshake_request_count += 1; + } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::SaslAuthenticate(_), + .. + })) => { + self.connection_factory.add_auth_message(request.clone()); + handshake_request_count += 1; + } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ApiVersions(_), + .. + })) => { + handshake_request_count += 1; + } + _ => { + // The client is no longer performing authentication + self.auth_complete = true; + break; + } + } + } + // route all handshake messages + for _ in 0..handshake_request_count { + let request = requests.remove(0); + self.route_to_first_contact_node(request.clone()); + } + + if requests.is_empty() { + // all messages received in this batch are handshake messages, + // so dont continue with regular message handling + return Ok(()); + } else { + // the later messages in this batch are not handshake messages, + // so continue onto the regular message handling + } + } + let mut topics = vec![]; let mut groups = vec![]; for request in &mut requests { @@ -435,9 +451,7 @@ impl KafkaSinkCluster { } // request and process metadata if we are missing topics or the controller broker id - if (!topics.is_empty() || self.controller_broker.get().is_none()) - && self.sasl_status.is_handshake_complete() - { + if !topics.is_empty() || self.controller_broker.get().is_none() { let mut metadata = self.get_metadata_of_topics(topics).await?; match metadata.frame() { Some(Frame::Kafka(KafkaFrame::Response { @@ -616,30 +630,6 @@ impl KafkaSinkCluster { body: RequestBody::CreateTopics(_), .. })) => self.route_to_controller(message), - - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::ApiVersions(_), - .. - })) => self.route_to_first_contact_node(message.clone()), - - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::SaslHandshake(_), - .. - })) => { - self.route_to_first_contact_node(message.clone()); - self.connection_factory - .add_handshake_message(message.clone()); - } - - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::SaslAuthenticate(_), - .. - })) => { - self.route_to_first_contact_node(message.clone()); - self.connection_factory.add_auth_message(message.clone()); - self.sasl_status.set_handshake_complete(); - } - // route to random node _ => { let destination = self.nodes.choose(&mut self.rng).unwrap().broker_id; @@ -653,25 +643,6 @@ impl KafkaSinkCluster { Ok(()) } - fn route_to_first_contact_node(&mut self, message: Message) { - let destination = if let Some(first_contact_node) = &self.first_contact_node { - self.nodes - .iter_mut() - .find(|node| node.kafka_address == *first_contact_node) - .unwrap() - .broker_id - } else { - let node = self.nodes.get_mut(0).unwrap(); - self.first_contact_node = Some(node.kafka_address.clone()); - node.broker_id - }; - - self.pending_requests.push_back(PendingRequest::Routed { - destination, - request: message, - }); - } - async fn find_coordinator_of_group( &mut self, group: GroupId, @@ -892,6 +863,25 @@ impl KafkaSinkCluster { Ok(()) } + fn route_to_first_contact_node(&mut self, message: Message) { + let destination = if let Some(first_contact_node) = &self.first_contact_node { + self.nodes + .iter_mut() + .find(|node| node.kafka_address == *first_contact_node) + .unwrap() + .broker_id + } else { + let node = self.nodes.get_mut(0).unwrap(); + self.first_contact_node = Some(node.kafka_address.clone()); + node.broker_id + }; + + self.pending_requests.push_back(PendingRequest::Routed { + destination, + request: message, + }); + } + fn route_to_controller(&mut self, message: Message) { let broker_id = self.controller_broker.get().unwrap();