Skip to content

Commit

Permalink
Windsock: fix topology3 routing
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Feb 20, 2024
1 parent fea7468 commit 2b54c02
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 20 deletions.
39 changes: 33 additions & 6 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use shotover::transforms::TransformConfig;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use test_helpers::docker_compose::docker_compose;
use test_helpers::rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use test_helpers::rdkafka::client::DefaultClientContext;
use test_helpers::rdkafka::config::ClientConfig;
use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord};
Expand Down Expand Up @@ -401,8 +403,11 @@ impl Bench for KafkaBench {
// only one string field so we just directly store the value in resources
let broker_address = resources;

setup_topic(broker_address).await;

let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", broker_address)
.set("debug", "all")
.set("message.timeout.ms", "5000")
.create()
.unwrap();
Expand Down Expand Up @@ -467,13 +472,10 @@ struct BenchTaskProducerKafka {
#[async_trait]
impl BenchTaskProducer for BenchTaskProducerKafka {
async fn produce_one(&self) -> Result<(), String> {
// key is set to None which will result in round robin routing between all brokers
let record: FutureRecord<(), _> = FutureRecord::to("topic_foo").payload(&self.message);
self.producer
.send(
FutureRecord::to("topic_foo")
.payload(&self.message)
.key("Key"),
Timeout::Never,
)
.send(record, Timeout::Never)
.await
// Take just the error, ignoring the message contents because large messages result in unreadable noise in the logs.
.map_err(|e| format!("{:?}", e.0))
Expand Down Expand Up @@ -568,3 +570,28 @@ pub trait BenchTaskProducer: Clone + Send + Sync + 'static {
tasks
}
}

async fn setup_topic(broker_address: &str) {
let admin: AdminClient<DefaultClientContext> = ClientConfig::new()
.set("bootstrap.servers", broker_address)
.create()
.unwrap();
for topic in admin
.create_topics(
&[NewTopic {
name: "topic_foo",
num_partitions: 3,
replication: TopicReplication::Fixed(1),
config: vec![],
}],
&AdminOptions::new().operation_timeout(Some(Timeout::After(Duration::from_secs(60)))),
)
.await
.unwrap()
{
assert_eq!("topic_foo", topic.unwrap());
}

// Need to delay starting bench to avoid UnknownPartition errors
tokio::time::sleep(Duration::from_secs(5)).await;
}
29 changes: 15 additions & 14 deletions shotover/src/transforms/kafka/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -734,20 +734,18 @@ impl KafkaSinkCluster {
self.controller_broker.set(metadata.controller_id);

for topic in &metadata.topics {
self.topics.insert(
topic.0.clone(),
Topic {
partitions: topic
.1
.partitions
.iter()
.map(|partition| Partition {
leader_id: *partition.leader_id,
replica_nodes: partition.replica_nodes.iter().map(|x| x.0).collect(),
})
.collect(),
},
);
let mut partitions: Vec<_> = topic
.1
.partitions
.iter()
.map(|partition| Partition {
index: partition.partition_index,
leader_id: *partition.leader_id,
replica_nodes: partition.replica_nodes.iter().map(|x| x.0).collect(),
})
.collect();
partitions.sort_by_key(|x| x.index);
self.topics.insert(topic.0.clone(), Topic { partitions });
}
}

Expand Down Expand Up @@ -890,10 +888,13 @@ impl KafkaNode {
}
}

#[derive(Debug)]
struct Topic {
partitions: Vec<Partition>,
}
#[derive(Debug)]
struct Partition {
index: i32,
leader_id: i32,
replica_nodes: Vec<i32>,
}
Expand Down

0 comments on commit 2b54c02

Please sign in to comment.