From add090f7627f367c2dfb4cf61bedfe2317e0936e Mon Sep 17 00:00:00 2001 From: conorbros Date: Thu, 21 Mar 2024 13:30:43 +1100 Subject: [PATCH 1/3] remove sasl config --- docs/src/transforms.md | 3 -- .../benches/windsock/kafka/bench.rs | 1 - shotover-proxy/config/config.yaml | 2 +- .../kafka/cluster-sasl/topology-single.yaml | 1 - .../kafka/cluster-sasl/topology1.yaml | 1 - .../kafka/cluster-sasl/topology2.yaml | 1 - .../kafka/cluster-sasl/topology3.yaml | 1 - .../src/transforms/kafka/sink_cluster/mod.rs | 53 ++++++++++++------- 8 files changed, 35 insertions(+), 28 deletions(-) diff --git a/docs/src/transforms.md b/docs/src/transforms.md index 808f113d1..2aaffc998 100644 --- a/docs/src/transforms.md +++ b/docs/src/transforms.md @@ -309,9 +309,6 @@ This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster # When a timeout occurs the connection to the client is immediately closed. # read_timeout: 60 - # When this field is enabled it allows the use of SASL authentication. If you intend to use SASL this field must be enabled, it is false by default. - sasl_enabled: false - # When this field is provided TLS is used when connecting to the remote address. # Removing this field will disable TLS. #tls: diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 13f99c86b..145178f68 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -98,7 +98,6 @@ impl KafkaBench { broker_id: 0, }], tls: None, - sasl_enabled: Some(false), }), }); common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig { diff --git a/shotover-proxy/config/config.yaml b/shotover-proxy/config/config.yaml index 85830d979..025ab0007 100644 --- a/shotover-proxy/config/config.yaml +++ b/shotover-proxy/config/config.yaml @@ -1,5 +1,5 @@ # configure the first `info` to set the log level for dependencies # configure `shotover=info` to set the log level for shotover itself # set `shotover::connection_span=info` to `shotover::connection_span=debug` to attach connection info to most log events, this is disabled by default due to a minor performance hit. -main_log_level: "info, shotover=info, shotover::connection_span=info" +main_log_level: "debug, shotover=debug, shotover::connection_span=debug" observability_interface: "0.0.0.0:9001" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml index 8a59741ac..f2d8f4368 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology-single.yaml @@ -9,6 +9,5 @@ sources: - address: "127.0.0.1:9192" rack: "rack0" broker_id: 0 - sasl_enabled: true first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml index 7a6990015..15fe492e1 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology1.yaml @@ -5,7 +5,6 @@ sources: listen_addr: "127.0.0.1:9191" chain: - KafkaSinkCluster: - sasl_enabled: true shotover_nodes: - address: "127.0.0.1:9191" rack: "rack0" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml index 1fa948922..3844a8486 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology2.yaml @@ -5,7 +5,6 @@ sources: listen_addr: "127.0.0.1:9192" chain: - KafkaSinkCluster: - sasl_enabled: true shotover_nodes: - address: "127.0.0.1:9191" rack: "rack0" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml index 1cf09d057..0fa7eb760 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl/topology3.yaml @@ -5,7 +5,6 @@ sources: listen_addr: "127.0.0.1:9193" chain: - KafkaSinkCluster: - sasl_enabled: true shotover_nodes: - address: "127.0.0.1:9191" rack: "rack0" diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 7859231ad..138f58245 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -51,7 +51,6 @@ pub struct KafkaSinkClusterConfig { pub connect_timeout_ms: u64, pub read_timeout: Option, pub tls: Option, - pub sasl_enabled: Option, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -99,7 +98,6 @@ impl TransformConfig for KafkaSinkClusterConfig { self.connect_timeout_ms, self.read_timeout, tls, - self.sasl_enabled.unwrap_or(false), ))) } } @@ -116,7 +114,6 @@ pub struct KafkaSinkClusterBuilder { topic_by_id: Arc>, nodes_shared: Arc>>, tls: Option, - sasl_enabled: bool, } impl KafkaSinkClusterBuilder { @@ -127,7 +124,6 @@ impl KafkaSinkClusterBuilder { connect_timeout_ms: u64, timeout: Option, tls: Option, - sasl_enabled: bool, ) -> KafkaSinkClusterBuilder { let receive_timeout = timeout.map(Duration::from_secs); @@ -148,7 +144,6 @@ impl KafkaSinkClusterBuilder { topic_by_id: Arc::new(DashMap::new()), nodes_shared: Arc::new(RwLock::new(vec![])), tls, - sasl_enabled, } } } @@ -166,7 +161,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { topic_by_name: self.topic_by_name.clone(), topic_by_id: self.topic_by_id.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), - sasl_status: SaslStatus::new(self.sasl_enabled), + sasl_status: SaslStatus::new(), connection_factory: ConnectionFactory::new(self.tls.clone(), self.connect_timeout), first_contact_node: None, fetch_session_id_to_broker: HashMap::new(), @@ -207,28 +202,22 @@ impl AtomicBrokerId { #[derive(Debug)] struct SaslStatus { - enabled: bool, - handshake_complete: bool, + auth_complete: bool, } impl SaslStatus { - fn new(enabled: bool) -> Self { + fn new() -> Self { Self { - enabled, - handshake_complete: false, + auth_complete: false, } } - fn set_handshake_complete(&mut self) { - self.handshake_complete = true; + fn set_auth_complete(&mut self, v: bool) { + self.auth_complete = v; } fn is_handshake_complete(&self) -> bool { - if self.enabled { - self.handshake_complete - } else { - true - } + self.auth_complete } } @@ -371,6 +360,10 @@ impl KafkaSinkCluster { } } + if !self.sasl_status.is_handshake_complete() { + self.requests_contain_non_handshake_message(&mut requests); + } + for group in groups { match self.find_coordinator_of_group(group.clone()).await { Ok(node) => { @@ -625,7 +618,6 @@ impl KafkaSinkCluster { .await?; self.connection_factory.add_auth_message(message.clone()); results.push(rx); - self.sasl_status.set_handshake_complete(); } // route to random node @@ -774,6 +766,29 @@ impl KafkaSinkCluster { Ok(rx.await.unwrap().response.unwrap()) } + fn requests_contain_non_handshake_message(&mut self, requests: &mut [Message]) { + for requests in requests.iter_mut() { + match requests.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ApiVersions(_), + .. + })) => {} + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::SaslHandshake(_), + .. + })) => {} + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::SaslAuthenticate(_), + .. + })) => {} + _ => { + self.sasl_status.set_auth_complete(true); + return; + } + } + } + } + async fn receive_responses( &mut self, find_coordinator_requests: &[FindCoordinator], From dd2f26b6a8a50c7ee12a9b7e0a8b613a62809d68 Mon Sep 17 00:00:00 2001 From: conorbros Date: Mon, 25 Mar 2024 13:26:26 +1100 Subject: [PATCH 2/3] review feedback --- shotover-proxy/config/config.yaml | 2 +- .../src/transforms/kafka/sink_cluster/mod.rs | 33 +++---------------- 2 files changed, 6 insertions(+), 29 deletions(-) diff --git a/shotover-proxy/config/config.yaml b/shotover-proxy/config/config.yaml index 025ab0007..85830d979 100644 --- a/shotover-proxy/config/config.yaml +++ b/shotover-proxy/config/config.yaml @@ -1,5 +1,5 @@ # configure the first `info` to set the log level for dependencies # configure `shotover=info` to set the log level for shotover itself # set `shotover::connection_span=info` to `shotover::connection_span=debug` to attach connection info to most log events, this is disabled by default due to a minor performance hit. -main_log_level: "debug, shotover=debug, shotover::connection_span=debug" +main_log_level: "info, shotover=info, shotover::connection_span=info" observability_interface: "0.0.0.0:9001" diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 138f58245..65cef2bf7 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -161,7 +161,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { topic_by_name: self.topic_by_name.clone(), topic_by_id: self.topic_by_id.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), - sasl_status: SaslStatus::new(), + auth_complete: false, connection_factory: ConnectionFactory::new(self.tls.clone(), self.connect_timeout), first_contact_node: None, fetch_session_id_to_broker: HashMap::new(), @@ -200,27 +200,6 @@ impl AtomicBrokerId { } } -#[derive(Debug)] -struct SaslStatus { - auth_complete: bool, -} - -impl SaslStatus { - fn new() -> Self { - Self { - auth_complete: false, - } - } - - fn set_auth_complete(&mut self, v: bool) { - self.auth_complete = v; - } - - fn is_handshake_complete(&self) -> bool { - self.auth_complete - } -} - pub struct KafkaSinkCluster { first_contact_points: Vec, shotover_nodes: Vec, @@ -232,7 +211,7 @@ pub struct KafkaSinkCluster { topic_by_name: Arc>, topic_by_id: Arc>, rng: SmallRng, - sasl_status: SaslStatus, + auth_complete: bool, connection_factory: ConnectionFactory, first_contact_node: Option, // its not clear from the docs if this cache needs to be accessed cross connection: @@ -360,7 +339,7 @@ impl KafkaSinkCluster { } } - if !self.sasl_status.is_handshake_complete() { + if !self.auth_complete { self.requests_contain_non_handshake_message(&mut requests); } @@ -382,9 +361,7 @@ impl KafkaSinkCluster { } // request and process metadata if we are missing topics or the controller broker id - if (!topics.is_empty() || self.controller_broker.get().is_none()) - && self.sasl_status.is_handshake_complete() - { + if (!topics.is_empty() || self.controller_broker.get().is_none()) && self.auth_complete { let mut metadata = self.get_metadata_of_topics(topics).await?; match metadata.frame() { Some(Frame::Kafka(KafkaFrame::Response { @@ -782,7 +759,7 @@ impl KafkaSinkCluster { .. })) => {} _ => { - self.sasl_status.set_auth_complete(true); + self.auth_complete = true; return; } } From 471100a5c171c428f0a776b394fc3f1039b56efa Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 25 Mar 2024 15:59:07 +1100 Subject: [PATCH 3/3] bug fix --- .../src/transforms/kafka/sink_cluster/mod.rs | 117 ++++++++---------- 1 file changed, 53 insertions(+), 64 deletions(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 65cef2bf7..a8a76b2ab 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -304,6 +304,58 @@ impl KafkaSinkCluster { mut requests: Vec, ) -> Result>> { let mut results = Vec::with_capacity(requests.len()); + + if !self.auth_complete { + let mut handshake_request_count = 0; + for request in &mut requests { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::SaslHandshake(_), + .. + })) => { + self.connection_factory + .add_handshake_message(request.clone()); + handshake_request_count += 1; + } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::SaslAuthenticate(_), + .. + })) => { + self.connection_factory.add_auth_message(request.clone()); + handshake_request_count += 1; + } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ApiVersions(_), + .. + })) => { + handshake_request_count += 1; + } + _ => { + // The client is no longer performing authentication + self.auth_complete = true; + break; + } + } + } + // route all handshake messages + for _ in 0..handshake_request_count { + let request = requests.remove(0); + let (tx, rx) = oneshot::channel(); + self.route_to_first_contact_node(request.clone(), Some(tx)) + .await?; + results.push(rx); + } + + if requests.is_empty() { + // all messages received in this batch are handshake messages, + // so dont continue with regular message handling + return Ok(results); + } else { + // the later messages in this batch are not handshake messages, + // so continue onto the regular message handling + } + } + let mut topics = vec![]; let mut groups = vec![]; for request in &mut requests { @@ -339,10 +391,6 @@ impl KafkaSinkCluster { } } - if !self.auth_complete { - self.requests_contain_non_handshake_message(&mut requests); - } - for group in groups { match self.find_coordinator_of_group(group.clone()).await { Ok(node) => { @@ -361,7 +409,7 @@ impl KafkaSinkCluster { } // request and process metadata if we are missing topics or the controller broker id - if (!topics.is_empty() || self.controller_broker.get().is_none()) && self.auth_complete { + if !topics.is_empty() || self.controller_broker.get().is_none() { let mut metadata = self.get_metadata_of_topics(topics).await?; match metadata.frame() { Some(Frame::Kafka(KafkaFrame::Response { @@ -561,42 +609,6 @@ impl KafkaSinkCluster { .. })) => results.push(self.route_to_controller(message).await?), - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::ApiVersions(_), - .. - })) => { - let (tx, rx) = oneshot::channel(); - self.route_to_first_contact_node(message.clone(), Some(tx)) - .await?; - - results.push(rx); - } - - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::SaslHandshake(_), - .. - })) => { - let (tx, rx) = oneshot::channel(); - self.route_to_first_contact_node(message.clone(), Some(tx)) - .await?; - - self.connection_factory - .add_handshake_message(message.clone()); - - results.push(rx); - } - - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::SaslAuthenticate(_), - .. - })) => { - let (tx, rx) = oneshot::channel(); - self.route_to_first_contact_node(message.clone(), Some(tx)) - .await?; - self.connection_factory.add_auth_message(message.clone()); - results.push(rx); - } - // route to random node _ => { let connection = self @@ -743,29 +755,6 @@ impl KafkaSinkCluster { Ok(rx.await.unwrap().response.unwrap()) } - fn requests_contain_non_handshake_message(&mut self, requests: &mut [Message]) { - for requests in requests.iter_mut() { - match requests.frame() { - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::ApiVersions(_), - .. - })) => {} - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::SaslHandshake(_), - .. - })) => {} - Some(Frame::Kafka(KafkaFrame::Request { - body: RequestBody::SaslAuthenticate(_), - .. - })) => {} - _ => { - self.auth_complete = true; - return; - } - } - } - } - async fn receive_responses( &mut self, find_coordinator_requests: &[FindCoordinator],