From 08af0f493962a64df291c2d092fe277b3819d4f5 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 10 Jan 2024 13:28:34 +1100 Subject: [PATCH] ? --- shotover-proxy/benches/windsock/kafka.rs | 30 ++++++++++++++++++------ 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/shotover-proxy/benches/windsock/kafka.rs b/shotover-proxy/benches/windsock/kafka.rs index 7a3b9b970..b83ae9728 100644 --- a/shotover-proxy/benches/windsock/kafka.rs +++ b/shotover-proxy/benches/windsock/kafka.rs @@ -16,6 +16,8 @@ use shotover::transforms::TransformConfig; use std::sync::Arc; use std::{collections::HashMap, time::Duration}; use test_helpers::docker_compose::docker_compose; +use test_helpers::rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; +use test_helpers::rdkafka::client::DefaultClientContext; use test_helpers::rdkafka::config::ClientConfig; use test_helpers::rdkafka::consumer::{Consumer, StreamConsumer}; use test_helpers::rdkafka::producer::{FutureProducer, FutureRecord}; @@ -347,6 +349,24 @@ impl Bench for KafkaBench { // only one string field so we just directly store the value in resources let broker_address = resources; + let admin: AdminClient = ClientConfig::new() + .set("bootstrap.servers", broker_address) + .create() + .unwrap(); + admin + .create_topics( + &[NewTopic { + name: "topic_foo", + num_partitions: 1, + replication: TopicReplication::Fixed(1), + config: vec![], + }], + &AdminOptions::new() + .operation_timeout(Some(Timeout::After(Duration::from_secs(60)))), + ) + .await + .unwrap(); + let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", broker_address) .set("message.timeout.ms", "5000") @@ -413,14 +433,10 @@ struct BenchTaskProducerKafka { #[async_trait] impl BenchTaskProducer for BenchTaskProducerKafka { async fn produce_one(&self) -> Result<(), String> { - let key = rand::random::<[u8; 4]>(); + // key is set to None which will result in round robin routing between all brokers + let record: FutureRecord<(), _> = FutureRecord::to("topic_foo").payload(&self.message); self.producer - .send( - FutureRecord::to("topic_foo") - .payload(&self.message) - .key(&key), - Timeout::Never, - ) + .send(record, Timeout::Never) .await // Take just the error, ignoring the message contents because large messages result in unreadable noise in the logs. .map_err(|e| format!("{:?}", e.0))