diff --git a/shotover/src/transforms/kafka/sink_cluster.rs b/shotover/src/transforms/kafka/sink_cluster.rs index b8baab612..bd61fb16d 100644 --- a/shotover/src/transforms/kafka/sink_cluster.rs +++ b/shotover/src/transforms/kafka/sink_cluster.rs @@ -12,8 +12,9 @@ use async_trait::async_trait; use dashmap::DashMap; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; use kafka_protocol::messages::{ - ApiKey, FindCoordinatorRequest, GroupId, HeartbeatRequest, JoinGroupRequest, MetadataRequest, - MetadataResponse, OffsetFetchRequest, RequestHeader, SyncGroupRequest, TopicName, + ApiKey, BrokerId, FindCoordinatorRequest, GroupId, HeartbeatRequest, JoinGroupRequest, + MetadataRequest, MetadataResponse, OffsetFetchRequest, RequestHeader, SyncGroupRequest, + TopicName, }; use kafka_protocol::protocol::{Builder, StrBytes}; use rand::rngs::SmallRng; @@ -60,8 +61,8 @@ pub struct KafkaSinkClusterBuilder { shotover_nodes: Vec, connect_timeout: Duration, read_timeout: Option, - coordinator_broker_id: Arc>, - topics: Arc>, + group_to_coordinator_broker: Arc>, + topics: Arc>, nodes_shared: Arc>>, } @@ -91,7 +92,7 @@ impl KafkaSinkClusterBuilder { shotover_nodes, connect_timeout: Duration::from_millis(connect_timeout_ms), read_timeout: receive_timeout, - coordinator_broker_id: Arc::new(DashMap::new()), + group_to_coordinator_broker: Arc::new(DashMap::new()), topics: Arc::new(DashMap::new()), nodes_shared: Arc::new(RwLock::new(vec![])), } @@ -108,7 +109,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { read_timeout: self.read_timeout, nodes: vec![], nodes_shared: self.nodes_shared.clone(), - coordinator_broker_id: self.coordinator_broker_id.clone(), + group_to_coordinator_broker: self.group_to_coordinator_broker.clone(), topics: self.topics.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), }) @@ -131,8 +132,8 @@ pub struct KafkaSinkCluster { read_timeout: Option, nodes: Vec, nodes_shared: Arc>>, - coordinator_broker_id: Arc>, - topics: Arc>, + group_to_coordinator_broker: Arc>, + topics: Arc>, rng: SmallRng, } @@ -182,15 +183,16 @@ impl Transform for KafkaSinkCluster { } impl KafkaSinkCluster { - fn store_topic(&self, topics: &mut Vec, topic: TopicName) { - if self.topics.get(&topic.0).is_none() && !topics.contains(&topic.0) { - topics.push(topic.0); + fn store_topic(&self, topics: &mut Vec, topic: TopicName) { + if self.topics.get(&topic).is_none() && !topics.contains(&topic) { + topics.push(topic); } } - fn store_group(&self, groups: &mut Vec, group_id: GroupId) { - if self.coordinator_broker_id.get(&group_id.0).is_none() && !groups.contains(&group_id.0) { - groups.push(group_id.0); + fn store_group(&self, groups: &mut Vec, group_id: GroupId) { + if self.group_to_coordinator_broker.get(&group_id).is_none() && !groups.contains(&group_id) + { + groups.push(group_id); } } @@ -249,105 +251,13 @@ impl KafkaSinkCluster { } for group in groups { - let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { - header: RequestHeader::builder() - .request_api_key(ApiKey::FindCoordinatorKey as i16) - .request_api_version(2) - .correlation_id(0) - .client_id(None) - .unknown_tagged_fields(Default::default()) - .build() - .unwrap(), - body: RequestBody::FindCoordinator( - FindCoordinatorRequest::builder() - .coordinator_keys(vec![]) - .key_type(0) - .key(group.clone()) - .unknown_tagged_fields(Default::default()) - .build() - .unwrap(), - ), - })); - - let connection = self - .nodes - .choose_mut(&mut self.rng) - .unwrap() - .get_connection(self.connect_timeout) - .await?; - let (tx, rx) = oneshot::channel(); - connection - .send(Request { - message: request, - return_chan: Some(tx), - }) - .map_err(|_| anyhow!("Failed to send"))?; - let mut response = rx.await.unwrap().response.unwrap(); - match response.frame() { - Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::FindCoordinator(coordinator), - .. - })) => { - self.coordinator_broker_id - .insert(group, coordinator.node_id.0); - } - other => { - return Err(anyhow!( - "Unexpected message returned to metadata request {other:?}" - )) - } - } + let coordinator = self.find_coordinator_of_group(group.clone()).await?; + self.group_to_coordinator_broker.insert(group, coordinator); } if !topics.is_empty() { - let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { - header: RequestHeader::builder() - .request_api_key(ApiKey::MetadataKey as i16) - .request_api_version(4) - .correlation_id(0) - .client_id(None) - .unknown_tagged_fields(Default::default()) - .build() - .unwrap(), - body: RequestBody::Metadata( - MetadataRequest::builder() - .topics(Some( - topics - .into_iter() - .map(|name| { - MetadataRequestTopic::builder() - .name(Some(TopicName(name))) - .topic_id(Uuid::nil()) - .unknown_tagged_fields(Default::default()) - .build() - .unwrap() - }) - .collect(), - )) - .allow_auto_topic_creation(false) - .include_cluster_authorized_operations(false) - .include_topic_authorized_operations(false) - .unknown_tagged_fields(Default::default()) - .build() - .unwrap(), - ), - })); - - let connection = self - .nodes - .choose_mut(&mut self.rng) - .unwrap() - .get_connection(self.connect_timeout) - .await?; - let (tx, rx) = oneshot::channel(); - connection - .send(Request { - message: request, - return_chan: Some(tx), - }) - .map_err(|_| anyhow!("Failed to send"))?; - let mut response = rx.await.unwrap().response.unwrap(); - match response.frame() { + let mut metadata = self.get_metadata_of_topics(topics).await?; + match metadata.frame() { Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Metadata(metadata), .. @@ -511,6 +421,103 @@ impl KafkaSinkCluster { Ok(results) } + async fn find_coordinator_of_group(&mut self, group: GroupId) -> Result { + let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { + header: RequestHeader::builder() + .request_api_key(ApiKey::FindCoordinatorKey as i16) + .request_api_version(2) + .correlation_id(0) + .client_id(None) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(), + body: RequestBody::FindCoordinator( + FindCoordinatorRequest::builder() + .coordinator_keys(vec![]) + .key_type(0) + .key(group.0) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(), + ), + })); + + let connection = self + .nodes + .choose_mut(&mut self.rng) + .unwrap() + .get_connection(self.connect_timeout) + .await?; + let (tx, rx) = oneshot::channel(); + connection + .send(Request { + message: request, + return_chan: Some(tx), + }) + .map_err(|_| anyhow!("Failed to send"))?; + let mut response = rx.await.unwrap().response.unwrap(); + match response.frame() { + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::FindCoordinator(coordinator), + .. + })) => Ok(coordinator.node_id), + other => { + return Err(anyhow!( + "Unexpected message returned to metadata request {other:?}" + )) + } + } + } + async fn get_metadata_of_topics(&mut self, topics: Vec) -> Result { + let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request { + header: RequestHeader::builder() + .request_api_key(ApiKey::MetadataKey as i16) + .request_api_version(4) + .correlation_id(0) + .client_id(None) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(), + body: RequestBody::Metadata( + MetadataRequest::builder() + .topics(Some( + topics + .into_iter() + .map(|name| { + MetadataRequestTopic::builder() + .name(Some(name)) + .topic_id(Uuid::nil()) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap() + }) + .collect(), + )) + .allow_auto_topic_creation(false) + .include_cluster_authorized_operations(false) + .include_topic_authorized_operations(false) + .unknown_tagged_fields(Default::default()) + .build() + .unwrap(), + ), + })); + + let connection = self + .nodes + .choose_mut(&mut self.rng) + .unwrap() + .get_connection(self.connect_timeout) + .await?; + let (tx, rx) = oneshot::channel(); + connection + .send(Request { + message: request, + return_chan: Some(tx), + }) + .map_err(|_| anyhow!("Failed to send"))?; + Ok(rx.await.unwrap().response.unwrap()) + } + async fn receive_responses( &mut self, find_coordinator_requests: &[FindCoordinator], @@ -540,8 +547,8 @@ impl KafkaSinkCluster { if *version <= 3 { if request.key_type == 0 { - self.coordinator_broker_id - .insert(request.key.clone(), find_coordinator.node_id.0); + self.group_to_coordinator_broker + .insert(GroupId(request.key.clone()), find_coordinator.node_id); } rewrite_address( &self.shotover_nodes, @@ -551,8 +558,10 @@ impl KafkaSinkCluster { } else { for coordinator in &mut find_coordinator.coordinators { if request.key_type == 0 { - self.coordinator_broker_id - .insert(coordinator.key.clone(), find_coordinator.node_id.0); + self.group_to_coordinator_broker.insert( + GroupId(coordinator.key.clone()), + find_coordinator.node_id, + ); } rewrite_address( &self.shotover_nodes, @@ -601,7 +610,7 @@ impl KafkaSinkCluster { ) -> Result> { let mut connection = None; for node in &mut self.nodes { - if let Some(broker_id) = self.coordinator_broker_id.get(&group_name) { + if let Some(broker_id) = self.group_to_coordinator_broker.get(&group_name) { if node.broker_id == *broker_id { connection = Some(node.get_connection(self.connect_timeout).await?.clone()); } @@ -653,7 +662,7 @@ impl KafkaSinkCluster { for topic in &metadata.topics { self.topics.insert( - topic.0.clone().0, + topic.0.clone(), Topic { partitions: topic .1