From e0e7ab1d334fc54be50992c0b947111c56da6b9e Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Thu, 23 Feb 2023 11:35:18 +1100 Subject: [PATCH] kafka_int_tests test consuming (#1054) --- shotover-proxy/tests/kafka_int_tests/mod.rs | 2 +- .../tests/kafka_int_tests/test_cases.rs | 46 ++++++++++++------- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 9153a4d7a..cb6eefadb 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -5,7 +5,7 @@ mod test_cases; #[tokio::test] #[serial] -async fn passthrough() { +async fn passthrough_standard() { let _docker_compose = DockerCompose::new("tests/test-configs/kafka/passthrough/docker-compose.yaml"); let shotover = ShotoverProcessBuilder::new_with_topology( diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index a156759b8..b868052d6 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,32 +1,46 @@ use rdkafka::config::ClientConfig; -use rdkafka::message::{Header, OwnedHeaders}; +use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::producer::{FutureProducer, FutureRecord}; +use rdkafka::Message; use std::time::Duration; -async fn produce(brokers: &str, topic_name: &str) { - let producer: &FutureProducer = &ClientConfig::new() +async fn produce_consume(brokers: &str, topic_name: &str) { + let producer: FutureProducer = ClientConfig::new() .set("bootstrap.servers", brokers) .set("message.timeout.ms", "5000") .create() - .expect("Producer creation error"); + .unwrap(); let delivery_status = producer - .send( - FutureRecord::to(topic_name) - .payload("Message") - .key("Key") - .headers(OwnedHeaders::new().insert(Header { - key: "header_key", - value: Some("header_value"), - })), - Duration::from_secs(0), - ) + .send_result(FutureRecord::to(topic_name).payload("Message").key("Key")) + .unwrap() .await + .unwrap() .unwrap(); - assert_eq!(delivery_status, (0, 0)); + + let consumer: StreamConsumer = ClientConfig::new() + .set("bootstrap.servers", brokers) + .set("group.id", "some_group") + .set("session.timeout.ms", "6000") + .set("auto.offset.reset", "earliest") + .set("enable.auto.commit", "false") + .create() + .unwrap(); + consumer.subscribe(&[topic_name]).unwrap(); + + let message = tokio::time::timeout(Duration::from_secs(10), consumer.recv()) + .await + .expect("Timeout while receiving from producer") + .unwrap(); + let contents = message.payload_view::().unwrap().unwrap(); + assert_eq!("Message", contents); + assert_eq!(b"Key", message.key().unwrap()); + assert_eq!("foo", message.topic()); + assert_eq!(0, message.offset()); + assert_eq!(0, message.partition()); } pub async fn basic() { - produce("localhost:9192", "foo").await; + produce_consume("localhost:9192", "foo").await; }