Skip to content

Commit

Permalink
kafka_int_tests test consuming (#1054)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 23, 2023
1 parent 1e091ec commit e0e7ab1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
2 changes: 1 addition & 1 deletion shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod test_cases;

#[tokio::test]
#[serial]
async fn passthrough() {
async fn passthrough_standard() {
let _docker_compose =
DockerCompose::new("tests/test-configs/kafka/passthrough/docker-compose.yaml");
let shotover = ShotoverProcessBuilder::new_with_topology(
Expand Down
46 changes: 30 additions & 16 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,46 @@
use rdkafka::config::ClientConfig;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;
use std::time::Duration;

async fn produce(brokers: &str, topic_name: &str) {
let producer: &FutureProducer = &ClientConfig::new()
async fn produce_consume(brokers: &str, topic_name: &str) {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
.unwrap();

let delivery_status = producer
.send(
FutureRecord::to(topic_name)
.payload("Message")
.key("Key")
.headers(OwnedHeaders::new().insert(Header {
key: "header_key",
value: Some("header_value"),
})),
Duration::from_secs(0),
)
.send_result(FutureRecord::to(topic_name).payload("Message").key("Key"))
.unwrap()
.await
.unwrap()
.unwrap();

assert_eq!(delivery_status, (0, 0));

let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("group.id", "some_group")
.set("session.timeout.ms", "6000")
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "false")
.create()
.unwrap();
consumer.subscribe(&[topic_name]).unwrap();

let message = tokio::time::timeout(Duration::from_secs(10), consumer.recv())
.await
.expect("Timeout while receiving from producer")
.unwrap();
let contents = message.payload_view::<str>().unwrap().unwrap();
assert_eq!("Message", contents);
assert_eq!(b"Key", message.key().unwrap());
assert_eq!("foo", message.topic());
assert_eq!(0, message.offset());
assert_eq!(0, message.partition());
}

pub async fn basic() {
produce("localhost:9192", "foo").await;
produce_consume("localhost:9192", "foo").await;
}

0 comments on commit e0e7ab1

Please sign in to comment.