Skip to content

Commit

Permalink
Move rdkafka back to crates.io (#1310)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Sep 10, 2023
1 parent a8c1edf commit 5ff4a9e
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 7 deletions.
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 75 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::transforms::{Transform, TransformBuilder, Transforms, Wrapper};
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use dashmap::DashMap;
use kafka_protocol::messages::find_coordinator_response::Coordinator;
use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
use kafka_protocol::messages::{
ApiKey, BrokerId, FindCoordinatorRequest, GroupId, HeartbeatRequest, JoinGroupRequest,
Expand All @@ -21,6 +22,7 @@ use rand::rngs::SmallRng;
use rand::seq::{IteratorRandom, SliceRandom};
use rand::SeedableRng;
use serde::Deserialize;
use std::collections::HashMap;
use std::hash::Hasher;
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down Expand Up @@ -578,6 +580,7 @@ impl KafkaSinkCluster {
&mut coordinator.port,
)
}
deduplicate_coordinators(&mut find_coordinator.coordinators);
}
response.invalidate_cache();
}
Expand All @@ -588,8 +591,10 @@ impl KafkaSinkCluster {
self.process_metadata(metadata).await;

for (_, broker) in &mut metadata.brokers {
rewrite_address(&self.shotover_nodes, &mut broker.host, &mut broker.port)
rewrite_address(&self.shotover_nodes, &mut broker.host, &mut broker.port);
}
deduplicate_metadata_brokers(metadata);

response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
Expand Down Expand Up @@ -719,6 +724,75 @@ fn rewrite_address(shotover_nodes: &[KafkaAddress], host: &mut StrBytes, port: &
}
}

/// The rdkafka driver has been observed to get stuck when there are multiple brokers with identical host and port.
/// This function deterministically rewrites metadata to avoid such duplication.
fn deduplicate_metadata_brokers(metadata: &mut MetadataResponse) {
struct SeenBroker {
pub id: BrokerId,
pub address: KafkaAddress,
}
let mut seen: Vec<SeenBroker> = vec![];
let mut replacement_broker_id = HashMap::new();

// ensure deterministic results across shotover instances by first sorting the list of brokers by their broker id
metadata.brokers.sort_keys();

// populate replacement_broker_id.
// This is used both to determine which brokers to delete and which broker ids to use as a replacement for deleted brokers.
for (id, broker) in &mut metadata.brokers {
let address = KafkaAddress {
host: broker.host.clone(),
port: broker.port,
};
broker.rack = None;
if let Some(replacement) = seen.iter().find(|x| x.address == address) {
replacement_broker_id.insert(*id, replacement.id);
}
seen.push(SeenBroker { address, id: *id });
}

// remove brokers with duplicate addresses
for (original, _replacement) in replacement_broker_id.iter() {
metadata.brokers.remove(original);
}

// In the previous step some broker id's were removed but we might be referring to those id's elsewhere in the message.
// If there are any such cases fix them by changing the id to refer to the equivalent undeleted broker.
for (_, topic) in &mut metadata.topics {
for partition in &mut topic.partitions {
if let Some(id) = replacement_broker_id.get(&partition.leader_id) {
partition.leader_id = *id;
}
for replica_node in &mut partition.replica_nodes {
if let Some(id) = replacement_broker_id.get(replica_node) {
*replica_node = *id
}
}
}
}
}

/// We havent observed any failures due to duplicates in findcoordinator messages like we have in metadata messages.
/// But there might be similar issues lurking in other drivers so deduplicating seems reasonable.
fn deduplicate_coordinators(coordinators: &mut Vec<Coordinator>) {
let mut seen = vec![];
let mut to_delete = vec![];
for (i, coordinator) in coordinators.iter().enumerate() {
let address = KafkaAddress {
host: coordinator.host.clone(),
port: coordinator.port,
};
if seen.contains(&address) {
to_delete.push(i)
}
seen.push(address);
}

for to_delete in to_delete.iter().rev() {
coordinators.remove(*to_delete);
}
}

#[derive(Clone)]
struct KafkaNode {
broker_id: BrokerId,
Expand Down
3 changes: 1 addition & 2 deletions test-helpers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,4 @@ serde_yaml.workspace = true
anyhow.workspace = true
rcgen.workspace = true
docker-compose-runner = "0.2.0"
#rdkafka = { version = "0.32", features = ["cmake-build"] }
rdkafka = { branch = "updating_librdkafka_to_v2.1.1", git = "https://github.com/shotover/rust-rdkafka", features = ["cmake-build"], optional = true }
rdkafka = { version = "0.34", features = ["cmake-build"], optional = true }

0 comments on commit 5ff4a9e

Please sign in to comment.