Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windsock kafka fix routing #1417

Merged
merged 2 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use shotover::transforms::TransformConfig;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use test_helpers::docker_compose::docker_compose;
use test_helpers::rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use test_helpers::rdkafka::client::DefaultClientContext;
use test_helpers::rdkafka::config::ClientConfig;
use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord};
Expand Down Expand Up @@ -401,8 +403,11 @@ impl Bench for KafkaBench {
// only one string field so we just directly store the value in resources
let broker_address = resources;

setup_topic(broker_address).await;

let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", broker_address)
.set("debug", "all")
.set("message.timeout.ms", "5000")
.create()
.unwrap();
Expand Down Expand Up @@ -467,13 +472,10 @@ struct BenchTaskProducerKafka {
#[async_trait]
impl BenchTaskProducer for BenchTaskProducerKafka {
async fn produce_one(&self) -> Result<(), String> {
// key is set to None which will result in round robin routing between all brokers
let record: FutureRecord<(), _> = FutureRecord::to("topic_foo").payload(&self.message);
self.producer
.send(
FutureRecord::to("topic_foo")
.payload(&self.message)
.key("Key"),
Timeout::Never,
)
.send(record, Timeout::Never)
.await
// Take just the error, ignoring the message contents because large messages result in unreadable noise in the logs.
.map_err(|e| format!("{:?}", e.0))
Expand Down Expand Up @@ -568,3 +570,28 @@ pub trait BenchTaskProducer: Clone + Send + Sync + 'static {
tasks
}
}

async fn setup_topic(broker_address: &str) {
let admin: AdminClient<DefaultClientContext> = ClientConfig::new()
.set("bootstrap.servers", broker_address)
.create()
.unwrap();
for topic in admin
.create_topics(
&[NewTopic {
name: "topic_foo",
num_partitions: 3,
replication: TopicReplication::Fixed(1),
config: vec![],
}],
&AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(60)))),
)
.await
.unwrap()
{
assert_eq!("topic_foo", topic.unwrap());
}

// Need to delay starting bench to avoid UnknownPartition errors
tokio::time::sleep(Duration::from_secs(5)).await;
}
29 changes: 15 additions & 14 deletions shotover/src/transforms/kafka/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,20 +734,18 @@ impl KafkaSinkCluster {
self.controller_broker.set(metadata.controller_id);

for topic in &metadata.topics {
self.topics.insert(
topic.0.clone(),
Topic {
partitions: topic
.1
.partitions
.iter()
.map(|partition| Partition {
leader_id: *partition.leader_id,
replica_nodes: partition.replica_nodes.iter().map(|x| x.0).collect(),
})
.collect(),
},
);
let mut partitions: Vec<_> = topic
.1
.partitions
.iter()
.map(|partition| Partition {
index: partition.partition_index,
leader_id: *partition.leader_id,
replica_nodes: partition.replica_nodes.iter().map(|x| x.0).collect(),
})
.collect();
partitions.sort_by_key(|x| x.index);
self.topics.insert(topic.0.clone(), Topic { partitions });
}
}

Expand Down Expand Up @@ -890,10 +888,13 @@ impl KafkaNode {
}
}

#[derive(Debug)]
struct Topic {
partitions: Vec<Partition>,
}
#[derive(Debug)]
struct Partition {
index: i32,
leader_id: i32,
replica_nodes: Vec<i32>,
}
Expand Down
Loading