From dfc35d46bacb4802e4cfae67cf7d4096cb552c21 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 10 Jan 2024 13:28:34 +1100 Subject: [PATCH] Implement kafka controller routing --- .../tests/kafka_int_tests/test_cases.rs | 159 ++++++++++++++++-- .../kafka/cluster/docker-compose.yaml | 6 +- shotover/src/frame/kafka.rs | 17 +- shotover/src/transforms/kafka/sink_cluster.rs | 81 ++++++++- 4 files changed, 239 insertions(+), 24 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 84fc18e40..cab7460a9 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,15 +1,136 @@ use std::time::Duration; +use test_helpers::rdkafka::admin::{ + AdminClient, AdminOptions, AlterConfig, NewPartitions, NewTopic, OwnedResourceSpecifier, + ResourceSpecifier, TopicReplication, +}; +use test_helpers::rdkafka::client::DefaultClientContext; use test_helpers::rdkafka::config::ClientConfig; use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer}; use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord}; +use test_helpers::rdkafka::types::RDKafkaErrorCode; +use test_helpers::rdkafka::util::Timeout; use test_helpers::rdkafka::Message; -async fn produce_consume(brokers: &str, topic_name: &str) { - let producer: FutureProducer = ClientConfig::new() - .set("bootstrap.servers", brokers) +async fn admin(config: ClientConfig) { + let admin: AdminClient = config.create().unwrap(); + admin + .create_topics( + &[ + NewTopic { + name: "foo", + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }, + NewTopic { + name: "acks0", + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }, + NewTopic { + name: "to_delete", + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }, + ], + &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + + let results = admin + .create_partitions( + &[NewPartitions { + // TODO: modify topic "foo" instead so that we can test our handling of that with interesting partiton + replication count + topic_name: "to_delete", + new_partition_count: 2, + assignment: None, + }], + &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + for result in results { + let result = result.unwrap(); + assert_eq!(result, "to_delete") + } + + let results = admin + .describe_configs( + // TODO: test ResourceSpecifier::Broker and ResourceSpecifier::Group as well. + // Will need to find a way to get a valid broker id and to create a group. + &[ResourceSpecifier::Topic("to_delete")], + &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + for result in results { + let result = result.unwrap(); + assert_eq!( + result.specifier, + OwnedResourceSpecifier::Topic("to_delete".to_owned()) + ); + } + + let results = admin + .alter_configs( + &[AlterConfig { + specifier: ResourceSpecifier::Topic("to_delete"), + entries: [("foo", "bar")].into(), + }], + &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + for result in results { + assert_eq!( + result.unwrap(), + OwnedResourceSpecifier::Topic("to_delete".to_owned()) + ); + } + + let results = admin + .delete_topics( + &["to_delete"], + &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + for result in results { + assert_eq!(result.unwrap(), "to_delete"); + } +} + +async fn admin_cleanup(config: ClientConfig) { + let admin: AdminClient = config.create().unwrap(); + let results = admin + // The cpp driver will lock up when running certain commands after a delete_groups if the delete_groups is targeted at a group that doesnt exist. + // So just make sure to run it against a group that does exist. + .delete_groups( + &["some_group"], + &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + for result in results { + match result { + Ok(result) => assert_eq!(result, "some_group"), + Err(err) => assert_eq!( + err, + // Allow this error which can occur due to race condition in the test, but do not allow any other error types + ("some_group".to_owned(), RDKafkaErrorCode::NonEmptyGroup) + ), + } + } +} + +async fn produce_consume(client: ClientConfig) { + let topic_name = "foo"; + let producer: FutureProducer = client + .clone() .set("message.timeout.ms", "5000") - // internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber - .set("debug", "all") .create() .unwrap(); @@ -21,13 +142,12 @@ async fn produce_consume(brokers: &str, topic_name: &str) { .unwrap(); assert_eq!(delivery_status, (0, 0)); - let consumer: StreamConsumer = ClientConfig::new() - .set("bootstrap.servers", brokers) + let consumer: StreamConsumer = client + .clone() .set("group.id", "some_group") .set("session.timeout.ms", "6000") .set("auto.offset.reset", "earliest") .set("enable.auto.commit", "false") - .set("debug", "all") .create() .unwrap(); consumer.subscribe(&[topic_name]).unwrap(); @@ -44,13 +164,12 @@ async fn produce_consume(brokers: &str, topic_name: &str) { assert_eq!(0, message.partition()); } -async fn produce_consume_acks0(brokers: &str) { +async fn produce_consume_acks0(client: ClientConfig) { let topic_name = "acks0"; - let producer: FutureProducer = ClientConfig::new() - .set("bootstrap.servers", brokers) + let producer: FutureProducer = client + .clone() .set("message.timeout.ms", "5000") .set("acks", "0") - .set("debug", "all") .create() .unwrap(); @@ -71,13 +190,12 @@ async fn produce_consume_acks0(brokers: &str) { .unwrap(); } - let consumer: StreamConsumer = ClientConfig::new() - .set("bootstrap.servers", brokers) + let consumer: StreamConsumer = client + .clone() .set("group.id", "some_group") .set("session.timeout.ms", "6000") .set("auto.offset.reset", "earliest") .set("enable.auto.commit", "false") - .set("debug", "all") .create() .unwrap(); consumer.subscribe(&[topic_name]).unwrap(); @@ -97,6 +215,13 @@ async fn produce_consume_acks0(brokers: &str) { } pub async fn basic(address: &str) { - produce_consume(address, "foo").await; - produce_consume_acks0(address).await; + let mut client = ClientConfig::new(); + client + .set("bootstrap.servers", address) + // internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber + .set("debug", "all"); + admin(client.clone()).await; + produce_consume(client.clone()).await; + produce_consume_acks0(client.clone()).await; + admin_cleanup(client.clone()).await; } diff --git a/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml index 5db6fc973..b7a3fe4a7 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml @@ -14,16 +14,14 @@ services: networks: cluster_subnet: ipv4_address: 172.16.1.2 - environment: - &environment + environment: &environment KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093" KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://172.16.1.2:9092" ALLOW_PLAINTEXT_LISTENER: "yes" KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv" KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093" KAFKA_CFG_NODE_ID: 0 - volumes: - &volumes + volumes: &volumes - type: tmpfs target: /bitnami/kafka kafka1: diff --git a/shotover/src/frame/kafka.rs b/shotover/src/frame/kafka.rs index a250124db..185685f68 100644 --- a/shotover/src/frame/kafka.rs +++ b/shotover/src/frame/kafka.rs @@ -2,7 +2,8 @@ use crate::codec::kafka::RequestHeader as CodecRequestHeader; use anyhow::{anyhow, Context, Result}; use bytes::{BufMut, Bytes, BytesMut}; use kafka_protocol::messages::{ - ApiKey, DescribeClusterResponse, FetchRequest, FetchResponse, FindCoordinatorRequest, + ApiKey, CreateTopicsRequest, DeleteGroupsRequest, DeleteTopicsRequest, DescribeClusterResponse, + DescribeConfigsRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, @@ -81,6 +82,10 @@ pub enum RequestBody { FindCoordinator(FindCoordinatorRequest), LeaderAndIsr(LeaderAndIsrRequest), Heartbeat(HeartbeatRequest), + CreateTopics(CreateTopicsRequest), + DeleteTopics(DeleteTopicsRequest), + DeleteGroups(DeleteGroupsRequest), + DescribeConfigs(DescribeConfigsRequest), Unknown { api_key: ApiKey, message: Bytes }, } @@ -156,6 +161,12 @@ impl KafkaFrame { } ApiKey::LeaderAndIsrKey => RequestBody::LeaderAndIsr(decode(&mut bytes, version)?), ApiKey::HeartbeatKey => RequestBody::Heartbeat(decode(&mut bytes, version)?), + ApiKey::CreateTopicsKey => RequestBody::CreateTopics(decode(&mut bytes, version)?), + ApiKey::DeleteTopicsKey => RequestBody::DeleteTopics(decode(&mut bytes, version)?), + ApiKey::DeleteGroupsKey => RequestBody::DeleteGroups(decode(&mut bytes, version)?), + ApiKey::DescribeConfigsKey => { + RequestBody::DescribeConfigs(decode(&mut bytes, version)?) + } api_key => RequestBody::Unknown { api_key, message: bytes, @@ -229,6 +240,10 @@ impl KafkaFrame { RequestBody::FindCoordinator(x) => encode(x, bytes, version)?, RequestBody::LeaderAndIsr(x) => encode(x, bytes, version)?, RequestBody::Heartbeat(x) => encode(x, bytes, version)?, + RequestBody::CreateTopics(x) => encode(x, bytes, version)?, + RequestBody::DeleteTopics(x) => encode(x, bytes, version)?, + RequestBody::DeleteGroups(x) => encode(x, bytes, version)?, + RequestBody::DescribeConfigs(x) => encode(x, bytes, version)?, RequestBody::Unknown { message, .. } => bytes.extend_from_slice(&message), } } diff --git a/shotover/src/transforms/kafka/sink_cluster.rs b/shotover/src/transforms/kafka/sink_cluster.rs index a0c6c8b34..817631285 100644 --- a/shotover/src/transforms/kafka/sink_cluster.rs +++ b/shotover/src/transforms/kafka/sink_cluster.rs @@ -25,6 +25,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::hash::Hasher; use std::net::SocketAddr; +use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, oneshot, RwLock}; @@ -63,6 +64,7 @@ pub struct KafkaSinkClusterBuilder { shotover_nodes: Vec, connect_timeout: Duration, read_timeout: Option, + controller_broker: Arc, group_to_coordinator_broker: Arc>, topics: Arc>, nodes_shared: Arc>>, @@ -94,6 +96,7 @@ impl KafkaSinkClusterBuilder { shotover_nodes, connect_timeout: Duration::from_millis(connect_timeout_ms), read_timeout: receive_timeout, + controller_broker: Arc::new(AtomicBrokerId::new()), group_to_coordinator_broker: Arc::new(DashMap::new()), topics: Arc::new(DashMap::new()), nodes_shared: Arc::new(RwLock::new(vec![])), @@ -111,6 +114,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { read_timeout: self.read_timeout, nodes: vec![], nodes_shared: self.nodes_shared.clone(), + controller_broker: self.controller_broker.clone(), group_to_coordinator_broker: self.group_to_coordinator_broker.clone(), topics: self.topics.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), @@ -126,6 +130,28 @@ impl TransformBuilder for KafkaSinkClusterBuilder { } } +struct AtomicBrokerId(AtomicI64); + +impl AtomicBrokerId { + fn new() -> Self { + AtomicBrokerId(i64::MAX.into()) + } + + fn set(&self, value: BrokerId) { + self.0 + .store(value.0.into(), std::sync::atomic::Ordering::Relaxed) + } + + /// Returns `None` when set has never been called. + /// Otherwise returns `Some` containing the latest set value. + fn get(&self) -> Option { + match self.0.load(std::sync::atomic::Ordering::Relaxed) { + i64::MAX => None, + other => Some(BrokerId(other as i32)), + } + } +} + pub struct KafkaSinkCluster { first_contact_points: Vec, shotover_nodes: Vec, @@ -134,6 +160,7 @@ pub struct KafkaSinkCluster { read_timeout: Option, nodes: Vec, nodes_shared: Arc>>, + controller_broker: Arc, group_to_coordinator_broker: Arc>, topics: Arc>, rng: SmallRng, @@ -263,7 +290,8 @@ impl KafkaSinkCluster { self.add_node_if_new(node).await; } - if !topics.is_empty() { + // request and process metadata if we are missing topics or the controller broker id + if !topics.is_empty() || self.controller_broker.get().is_none() { let mut metadata = self.get_metadata_of_topics(topics).await?; match metadata.frame() { Some(Frame::Kafka(KafkaFrame::Response { @@ -378,6 +406,7 @@ impl KafkaSinkCluster { results.push(rx); } + // route to group coordinator Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Heartbeat(heartbeat), .. @@ -406,6 +435,19 @@ impl KafkaSinkCluster { let group_id = join_group.group_id.clone(); results.push(self.route_to_coordinator(message, group_id).await?); } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DeleteGroups(groups), + .. + })) => { + let group_id = groups.groups_names.first().unwrap().clone(); + results.push(self.route_to_coordinator(message, group_id).await?); + } + + // route to controller broker + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::CreateTopics(_), + .. + })) => results.push(self.route_to_controller(message).await?), // route to random node _ => { @@ -473,7 +515,7 @@ impl KafkaSinkCluster { connection: None, }), other => Err(anyhow!( - "Unexpected message returned to metadata request {other:?}" + "Unexpected message returned to findcoordinator request {other:?}" )), } } @@ -608,6 +650,36 @@ impl KafkaSinkCluster { Ok(responses) } + async fn route_to_controller( + &mut self, + message: Message, + ) -> Result> { + let broker_id = self.controller_broker.get().unwrap(); + + let connection = if let Some(node) = + self.nodes.iter_mut().find(|x| x.broker_id == *broker_id) + { + node.get_connection(self.connect_timeout).await?.clone() + } else { + tracing::warn!("no known broker with id {broker_id:?}, routing message to a random node so that a NOT_CONTROLLER or similar error is returned to the client"); + self.nodes + .choose_mut(&mut self.rng) + .unwrap() + .get_connection(self.connect_timeout) + .await? + .clone() + }; + + let (tx, rx) = oneshot::channel(); + connection + .send(Request { + message, + return_chan: Some(tx), + }) + .map_err(|_| anyhow!("Failed to send"))?; + Ok(rx) + } + async fn route_to_coordinator( &mut self, message: Message, @@ -656,6 +728,8 @@ impl KafkaSinkCluster { self.add_node_if_new(node).await; } + self.controller_broker.set(metadata.controller_id); + for topic in &metadata.topics { self.topics.insert( topic.0.clone(), @@ -761,6 +835,9 @@ fn deduplicate_metadata_brokers(metadata: &mut MetadataResponse) { } } } + if let Some(id) = replacement_broker_id.get(&metadata.controller_id) { + metadata.controller_id = *id; + } } /// We havent observed any failures due to duplicates in findcoordinator messages like we have in metadata messages.