From e0cc6677418d40b1b580402095b158a6f32b242d Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 10 Jan 2024 13:28:34 +1100 Subject: [PATCH] ? --- shotover-proxy/benches/windsock/kafka.rs | 30 ++++++-- .../tests/kafka_int_tests/test_cases.rs | 49 ++++++++++++- .../kafka/cluster/docker-compose.yaml | 6 +- shotover/src/frame/kafka.rs | 17 +++-- shotover/src/transforms/kafka/sink_cluster.rs | 71 ++++++++++++++++++- 5 files changed, 153 insertions(+), 20 deletions(-) diff --git a/shotover-proxy/benches/windsock/kafka.rs b/shotover-proxy/benches/windsock/kafka.rs index 7a3b9b970..b83ae9728 100644 --- a/shotover-proxy/benches/windsock/kafka.rs +++ b/shotover-proxy/benches/windsock/kafka.rs @@ -16,6 +16,8 @@ use shotover::transforms::TransformConfig; use std::sync::Arc; use std::{collections::HashMap, time::Duration}; use test_helpers::docker_compose::docker_compose; +use test_helpers::rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; +use test_helpers::rdkafka::client::DefaultClientContext; use test_helpers::rdkafka::config::ClientConfig; use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer}; use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord}; @@ -347,6 +349,24 @@ impl Bench for KafkaBench { // only one string field so we just directly store the value in resources let broker_address = resources; + let admin: AdminClient = ClientConfig::new() + .set("bootstrap.servers", broker_address) + .create() + .unwrap(); + admin + .create_topics( + &[NewTopic { + name: "topic_foo", + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }], + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(60)))), + ) + .await + .unwrap(); + let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", broker_address) .set("message.timeout.ms", "5000") @@ -413,14 +433,10 @@ struct BenchTaskProducerKafka { #[async_trait] impl BenchTaskProducer for BenchTaskProducerKafka { async fn produce_one(&self) -> Result<(), String> { - let key = rand::random::<[u8; 4]>(); + // key is set to None which will result in round robin routing between all brokers + let record: FutureRecord<(), _> = FutureRecord::to("topic_foo").payload(&self.message); self.producer - .send( - FutureRecord::to("topic_foo") - .payload(&self.message) - .key(&key), - Timeout::Never, - ) + .send(record, Timeout::Never) .await // Take just the error, ignoring the message contents because large messages result in unreadable noise in the logs. .map_err(|e| format!("{:?}", e.0)) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 84fc18e40..13567b96c 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,10 +1,54 @@ use std::time::Duration; +use test_helpers::rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; +use test_helpers::rdkafka::client::DefaultClientContext; use test_helpers::rdkafka::config::ClientConfig; use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer}; use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord}; +use test_helpers::rdkafka::util::Timeout; use test_helpers::rdkafka::Message; -async fn produce_consume(brokers: &str, topic_name: &str) { +async fn admin(brokers: &str) { + let admin: AdminClient = ClientConfig::new() + .set("bootstrap.servers", brokers) + .create() + .unwrap(); + admin + .create_topics( + &[ + NewTopic { + name: "foo", + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }, + NewTopic { + name: "acks0", + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }, + NewTopic { + name: "to_delete", + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }, + ], + &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); + admin + .delete_topics( + &["to_delete"], + &AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(30)))), + ) + .await + .unwrap(); +} + +async fn produce_consume(brokers: &str) { + let topic_name = "foo"; let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", brokers) .set("message.timeout.ms", "5000") @@ -97,6 +141,7 @@ async fn produce_consume_acks0(brokers: &str) { } pub async fn basic(address: &str) { - produce_consume(address, "foo").await; + admin(address).await; + produce_consume(address).await; produce_consume_acks0(address).await; } diff --git a/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml index 5db6fc973..b7a3fe4a7 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster/docker-compose.yaml @@ -14,16 +14,14 @@ services: networks: cluster_subnet: ipv4_address: 172.16.1.2 - environment: - &environment + environment: &environment KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093" KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://172.16.1.2:9092" ALLOW_PLAINTEXT_LISTENER: "yes" KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv" KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093" KAFKA_CFG_NODE_ID: 0 - volumes: - &volumes + volumes: &volumes - type: tmpfs target: /bitnami/kafka kafka1: diff --git a/shotover/src/frame/kafka.rs b/shotover/src/frame/kafka.rs index a250124db..0fe1a2465 100644 --- a/shotover/src/frame/kafka.rs +++ b/shotover/src/frame/kafka.rs @@ -2,11 +2,12 @@ use crate::codec::kafka::RequestHeader as CodecRequestHeader; use anyhow::{anyhow, Context, Result}; use bytes::{BufMut, Bytes, BytesMut}; use kafka_protocol::messages::{ - ApiKey, DescribeClusterResponse, FetchRequest, FetchResponse, FindCoordinatorRequest, - FindCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, - JoinGroupResponse, LeaderAndIsrRequest, ListOffsetsRequest, ListOffsetsResponse, - MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, - ProduceResponse, RequestHeader, ResponseHeader, SyncGroupRequest, SyncGroupResponse, + ApiKey, CreateTopicsRequest, DeleteTopicsRequest, DescribeClusterResponse, FetchRequest, + FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, HeartbeatRequest, + HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, + ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, + OffsetFetchResponse, ProduceRequest, ProduceResponse, RequestHeader, ResponseHeader, + SyncGroupRequest, SyncGroupResponse, }; use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion, StrBytes}; use std::fmt::{Display, Formatter, Result as FmtResult}; @@ -81,6 +82,8 @@ pub enum RequestBody { FindCoordinator(FindCoordinatorRequest), LeaderAndIsr(LeaderAndIsrRequest), Heartbeat(HeartbeatRequest), + CreateTopics(CreateTopicsRequest), + DeleteTopics(DeleteTopicsRequest), Unknown { api_key: ApiKey, message: Bytes }, } @@ -156,6 +159,8 @@ impl KafkaFrame { } ApiKey::LeaderAndIsrKey => RequestBody::LeaderAndIsr(decode(&mut bytes, version)?), ApiKey::HeartbeatKey => RequestBody::Heartbeat(decode(&mut bytes, version)?), + ApiKey::CreateTopicsKey => RequestBody::CreateTopics(decode(&mut bytes, version)?), + ApiKey::DeleteTopicsKey => RequestBody::DeleteTopics(decode(&mut bytes, version)?), api_key => RequestBody::Unknown { api_key, message: bytes, @@ -229,6 +234,8 @@ impl KafkaFrame { RequestBody::FindCoordinator(x) => encode(x, bytes, version)?, RequestBody::LeaderAndIsr(x) => encode(x, bytes, version)?, RequestBody::Heartbeat(x) => encode(x, bytes, version)?, + RequestBody::CreateTopics(x) => encode(x, bytes, version)?, + RequestBody::DeleteTopics(x) => encode(x, bytes, version)?, RequestBody::Unknown { message, .. } => bytes.extend_from_slice(&message), } } diff --git a/shotover/src/transforms/kafka/sink_cluster.rs b/shotover/src/transforms/kafka/sink_cluster.rs index a0c6c8b34..cc7e2badb 100644 --- a/shotover/src/transforms/kafka/sink_cluster.rs +++ b/shotover/src/transforms/kafka/sink_cluster.rs @@ -25,6 +25,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::hash::Hasher; use std::net::SocketAddr; +use std::sync::atomic::AtomicI64; use std::sync::Arc; use std::time::Duration; use tokio::sync::{mpsc, oneshot, RwLock}; @@ -63,6 +64,7 @@ pub struct KafkaSinkClusterBuilder { shotover_nodes: Vec, connect_timeout: Duration, read_timeout: Option, + controller_broker: Arc, group_to_coordinator_broker: Arc>, topics: Arc>, nodes_shared: Arc>>, @@ -94,6 +96,7 @@ impl KafkaSinkClusterBuilder { shotover_nodes, connect_timeout: Duration::from_millis(connect_timeout_ms), read_timeout: receive_timeout, + controller_broker: Arc::new(AtomicBrokerId::new()), group_to_coordinator_broker: Arc::new(DashMap::new()), topics: Arc::new(DashMap::new()), nodes_shared: Arc::new(RwLock::new(vec![])), @@ -111,6 +114,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { read_timeout: self.read_timeout, nodes: vec![], nodes_shared: self.nodes_shared.clone(), + controller_broker: self.controller_broker.clone(), group_to_coordinator_broker: self.group_to_coordinator_broker.clone(), topics: self.topics.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), @@ -126,6 +130,28 @@ impl TransformBuilder for KafkaSinkClusterBuilder { } } +struct AtomicBrokerId(AtomicI64); + +impl AtomicBrokerId { + fn new() -> Self { + AtomicBrokerId(i64::MAX.into()) + } + + fn set(&self, value: BrokerId) { + self.0 + .store(value.0.into(), std::sync::atomic::Ordering::Relaxed) + } + + /// Returns `None` when set has never been called. + /// Otherwise returns `Some` containing the latest set value. + fn get(&self) -> Option { + match self.0.load(std::sync::atomic::Ordering::Relaxed) { + i64::MAX => None, + other => Some(BrokerId(other as i32)), + } + } +} + pub struct KafkaSinkCluster { first_contact_points: Vec, shotover_nodes: Vec, @@ -134,6 +160,7 @@ pub struct KafkaSinkCluster { read_timeout: Option, nodes: Vec, nodes_shared: Arc>>, + controller_broker: Arc, group_to_coordinator_broker: Arc>, topics: Arc>, rng: SmallRng, @@ -263,7 +290,8 @@ impl KafkaSinkCluster { self.add_node_if_new(node).await; } - if !topics.is_empty() { + // request and process metadata if we are missing topics or the controller broker id + if !topics.is_empty() || self.controller_broker.get().is_none() { let mut metadata = self.get_metadata_of_topics(topics).await?; match metadata.frame() { Some(Frame::Kafka(KafkaFrame::Response { @@ -378,6 +406,7 @@ impl KafkaSinkCluster { results.push(rx); } + // route to group coordinator Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Heartbeat(heartbeat), .. @@ -407,6 +436,12 @@ impl KafkaSinkCluster { results.push(self.route_to_coordinator(message, group_id).await?); } + // route to controller broker + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::CreateTopics(_), + .. + })) => results.push(self.route_to_controller(message).await?), + // route to random node _ => { let connection = self @@ -473,7 +508,7 @@ impl KafkaSinkCluster { connection: None, }), other => Err(anyhow!( - "Unexpected message returned to metadata request {other:?}" + "Unexpected message returned to findcoordinator request {other:?}" )), } } @@ -608,6 +643,36 @@ impl KafkaSinkCluster { Ok(responses) } + async fn route_to_controller( + &mut self, + message: Message, + ) -> Result> { + let broker_id = self.controller_broker.get().unwrap(); + + let connection = if let Some(node) = + self.nodes.iter_mut().find(|x| x.broker_id == *broker_id) + { + node.get_connection(self.connect_timeout).await?.clone() + } else { + tracing::warn!("no known broker with id {broker_id:?}, routing message to a random node so that a NOT_CONTROLLER or similar error is returned to the client"); + self.nodes + .choose_mut(&mut self.rng) + .unwrap() + .get_connection(self.connect_timeout) + .await? + .clone() + }; + + let (tx, rx) = oneshot::channel(); + connection + .send(Request { + message, + return_chan: Some(tx), + }) + .map_err(|_| anyhow!("Failed to send"))?; + Ok(rx) + } + async fn route_to_coordinator( &mut self, message: Message, @@ -656,6 +721,8 @@ impl KafkaSinkCluster { self.add_node_if_new(node).await; } + self.controller_broker.set(metadata.controller_id); + for topic in &metadata.topics { self.topics.insert( topic.0.clone(),