From 5ff4a9e500ea4ba223c39f0b705dba7b6a52e056 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 11 Sep 2023 09:11:25 +1000 Subject: [PATCH] Move rdkafka back to crates.io (#1310) --- Cargo.lock | 10 ++- shotover/src/transforms/kafka/sink_cluster.rs | 76 ++++++++++++++++++- test-helpers/Cargo.toml | 3 +- 3 files changed, 82 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a05073cc..8b9330756 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3654,8 +3654,9 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.31.0" -source = "git+https://github.com/shotover/rust-rdkafka?branch=updating_librdkafka_to_v2.1.1#1c6a3160ff7f3f33f00cb395397c256395f39e74" +version = "0.34.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053adfa02fab06e86c01d586cc68aa47ee0ff4489a59469081dc12cbcde578bf" dependencies = [ "futures-channel", "futures-util", @@ -3671,8 +3672,9 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "4.5.0+2.1.1" -source = "git+https://github.com/shotover/rust-rdkafka?branch=updating_librdkafka_to_v2.1.1#1c6a3160ff7f3f33f00cb395397c256395f39e74" +version = "4.6.0+2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad63c279fca41a27c231c450a2d2ad18288032e9cbb159ad16c9d96eba35aaaf" dependencies = [ "cmake", "libc", diff --git a/shotover/src/transforms/kafka/sink_cluster.rs b/shotover/src/transforms/kafka/sink_cluster.rs index d2ad26b35..7ae082039 100644 --- a/shotover/src/transforms/kafka/sink_cluster.rs +++ b/shotover/src/transforms/kafka/sink_cluster.rs @@ -10,6 +10,7 @@ use crate::transforms::{Transform, TransformBuilder, Transforms, Wrapper}; use anyhow::{anyhow, Result}; use async_trait::async_trait; use dashmap::DashMap; +use kafka_protocol::messages::find_coordinator_response::Coordinator; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; use kafka_protocol::messages::{ ApiKey, BrokerId, FindCoordinatorRequest, GroupId, HeartbeatRequest, JoinGroupRequest, @@ -21,6 +22,7 @@ use rand::rngs::SmallRng; use rand::seq::{IteratorRandom, SliceRandom}; use rand::SeedableRng; use serde::Deserialize; +use std::collections::HashMap; use std::hash::Hasher; use std::net::SocketAddr; use std::sync::Arc; @@ -578,6 +580,7 @@ impl KafkaSinkCluster { &mut coordinator.port, ) } + deduplicate_coordinators(&mut find_coordinator.coordinators); } response.invalidate_cache(); } @@ -588,8 +591,10 @@ impl KafkaSinkCluster { self.process_metadata(metadata).await; for (_, broker) in &mut metadata.brokers { - rewrite_address(&self.shotover_nodes, &mut broker.host, &mut broker.port) + rewrite_address(&self.shotover_nodes, &mut broker.host, &mut broker.port); } + deduplicate_metadata_brokers(metadata); + response.invalidate_cache(); } Some(Frame::Kafka(KafkaFrame::Response { @@ -719,6 +724,75 @@ fn rewrite_address(shotover_nodes: &[KafkaAddress], host: &mut StrBytes, port: & } } +/// The rdkafka driver has been observed to get stuck when there are multiple brokers with identical host and port. +/// This function deterministically rewrites metadata to avoid such duplication. +fn deduplicate_metadata_brokers(metadata: &mut MetadataResponse) { + struct SeenBroker { + pub id: BrokerId, + pub address: KafkaAddress, + } + let mut seen: Vec = vec![]; + let mut replacement_broker_id = HashMap::new(); + + // ensure deterministic results across shotover instances by first sorting the list of brokers by their broker id + metadata.brokers.sort_keys(); + + // populate replacement_broker_id. + // This is used both to determine which brokers to delete and which broker ids to use as a replacement for deleted brokers. + for (id, broker) in &mut metadata.brokers { + let address = KafkaAddress { + host: broker.host.clone(), + port: broker.port, + }; + broker.rack = None; + if let Some(replacement) = seen.iter().find(|x| x.address == address) { + replacement_broker_id.insert(*id, replacement.id); + } + seen.push(SeenBroker { address, id: *id }); + } + + // remove brokers with duplicate addresses + for (original, _replacement) in replacement_broker_id.iter() { + metadata.brokers.remove(original); + } + + // In the previous step some broker id's were removed but we might be referring to those id's elsewhere in the message. + // If there are any such cases fix them by changing the id to refer to the equivalent undeleted broker. + for (_, topic) in &mut metadata.topics { + for partition in &mut topic.partitions { + if let Some(id) = replacement_broker_id.get(&partition.leader_id) { + partition.leader_id = *id; + } + for replica_node in &mut partition.replica_nodes { + if let Some(id) = replacement_broker_id.get(replica_node) { + *replica_node = *id + } + } + } + } +} + +/// We havent observed any failures due to duplicates in findcoordinator messages like we have in metadata messages. +/// But there might be similar issues lurking in other drivers so deduplicating seems reasonable. +fn deduplicate_coordinators(coordinators: &mut Vec) { + let mut seen = vec![]; + let mut to_delete = vec![]; + for (i, coordinator) in coordinators.iter().enumerate() { + let address = KafkaAddress { + host: coordinator.host.clone(), + port: coordinator.port, + }; + if seen.contains(&address) { + to_delete.push(i) + } + seen.push(address); + } + + for to_delete in to_delete.iter().rev() { + coordinators.remove(*to_delete); + } +} + #[derive(Clone)] struct KafkaNode { broker_id: BrokerId, diff --git a/test-helpers/Cargo.toml b/test-helpers/Cargo.toml index 6672ae914..ce0da150b 100644 --- a/test-helpers/Cargo.toml +++ b/test-helpers/Cargo.toml @@ -33,5 +33,4 @@ serde_yaml.workspace = true anyhow.workspace = true rcgen.workspace = true docker-compose-runner = "0.2.0" -#rdkafka = { version = "0.32", features = ["cmake-build"] } -rdkafka = { branch = "updating_librdkafka_to_v2.1.1", git = "https://github.com/shotover/rust-rdkafka", features = ["cmake-build"], optional = true } +rdkafka = { version = "0.34", features = ["cmake-build"], optional = true }