Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration tests for fetch.min.bytes and fetch.wait.max.ms #1757

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 90 additions & 22 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use futures::{stream::FuturesUnordered, StreamExt};
use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse,
KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, NewPartition, NewTopic,
Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer,
NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicPartition,
},
docker_compose::DockerCompose,
};
Expand All @@ -28,6 +29,26 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_case1",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_case2",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_case3",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_case4",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "acks0",
num_partitions: 1,
Expand Down Expand Up @@ -117,18 +138,21 @@ pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnect

let mut consumer_partitions_1 = connection_builder
.connect_consumer(
"multi_topic_batch_partitions_1",
"multi_topic_batch_partitions_1_group",
ConsumerConfig::consume_from_topic("multi_topic_batch_partitions_1".to_owned())
.with_group("multi_topic_batch_partitions_1_group"),
)
.await;
let mut consumer_partitions_3 = connection_builder
.connect_consumer(
"multi_topic_batch_partitions_3",
"multi_topic_batch_partitions_3_group",
ConsumerConfig::consume_from_topic("multi_topic_batch_partitions_3".to_owned())
.with_group("multi_topic_batch_partitions_3_group"),
)
.await;
let mut consumer_unknown = connection_builder
.connect_consumer("batch_test_unknown", "batch_test_unknown_group")
.connect_consumer(
ConsumerConfig::consume_from_topic("batch_test_unknown".to_owned())
.with_group("batch_test_unknown_group"),
)
.await;

tokio::join!(
Expand Down Expand Up @@ -223,7 +247,10 @@ pub async fn produce_consume_multi_partition_batch(connection_builder: &KafkaCon
.await;

let mut consumer = connection_builder
.connect_consumer("multi_partitions_batch", "multi_partitions_batch_group")
.connect_consumer(
ConsumerConfig::consume_from_topic("multi_partitions_batch".to_owned())
.with_group("multi_partitions_batch_group"),
)
.await;

consumer
Expand Down Expand Up @@ -283,7 +310,9 @@ pub async fn produce_consume_partitions1(
.await;

let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -340,7 +369,9 @@ pub async fn produce_consume_partitions1(
// if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -413,7 +444,10 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
.await;

let mut consumer = connection_builder
.connect_consumer(topic_name, "kafka_node_goes_down_test_group")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("kafka_node_goes_down_test_group"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -472,7 +506,10 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
// if we create a new consumer it will start from the beginning since auto.offset.reset = earliest and enable.auto.commit false
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder
.connect_consumer(topic_name, "kafka_node_goes_down_test_group_new")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("kafka_node_goes_down_test_group_new"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -520,7 +557,10 @@ pub async fn produce_consume_commit_offsets_partitions1(
.await;

let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_with_offsets")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("consumer_group_with_offsets"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand Down Expand Up @@ -585,7 +625,10 @@ pub async fn produce_consume_commit_offsets_partitions1(
{
// The new consumer should consume Message2 which is at the last uncommitted offset
let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_with_offsets")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("consumer_group_with_offsets"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand All @@ -600,7 +643,10 @@ pub async fn produce_consume_commit_offsets_partitions1(
{
// The new consumer should still consume Message2 as its offset has not been committed
let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_with_offsets")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("consumer_group_with_offsets"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand All @@ -615,7 +661,10 @@ pub async fn produce_consume_commit_offsets_partitions1(
{
// A new consumer in another group should consume from the beginning since auto.offset.reset = earliest and enable.auto.commit false
let mut consumer = connection_builder
.connect_consumer(topic_name, "consumer_group_without_offsets")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("consumer_group_without_offsets"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
Expand All @@ -631,10 +680,17 @@ pub async fn produce_consume_commit_offsets_partitions1(
async fn produce_consume_partitions3(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
fetch_min_bytes: i32,
fetch_wait_max_ms: i32,
) {
let producer = connection_builder.connect_producer("1", 0).await;
let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned())
.with_group("some_group")
.with_fetch_min_bytes(fetch_min_bytes)
.with_fetch_max_wait_ms(fetch_wait_max_ms),
)
.await;

for _ in 0..5 {
Expand Down Expand Up @@ -697,7 +753,9 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
}

let mut consumer = connection_builder
.connect_consumer(topic_name, "some_group")
.connect_consumer(
ConsumerConfig::consume_from_topic(topic_name.to_owned()).with_group("some_group"),
)
.await;

for j in 0..10 {
Expand Down Expand Up @@ -727,7 +785,9 @@ pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilde

let mut producer = connection_builder.connect_producer("all", 0).await;
let mut consumer = connection_builder
.connect_consumer("partitions3", "some_group")
.connect_consumer(
ConsumerConfig::consume_from_topic("partitions3".to_owned()).with_group("some_group"),
)
.await;

// write to some open shotover connections
Expand Down Expand Up @@ -772,10 +832,18 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_partitions1(connection_builder, "partitions1").await;
produce_consume_partitions1(connection_builder, "unknown_topic").await;
produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await;
produce_consume_partitions3(connection_builder, "partitions3").await;
produce_consume_multi_topic_batch(connection_builder).await;
produce_consume_multi_partition_batch(connection_builder).await;

// test with minimum limits
produce_consume_partitions3(connection_builder, "partitions3_case1", 1, 0).await;
// test with minimum limits that results in a delay
produce_consume_partitions3(connection_builder, "partitions3_case2", 1, 1).await;
// test with default limits
produce_consume_partitions3(connection_builder, "partitions3_case3", 1, 500).await;
// set the bytes limit to 1MB so that we will not reach it and will hit the 100ms timeout every time.
produce_consume_partitions3(connection_builder, "partitions3_case4", 1_000_000, 100).await;

// Only run this test case on the java driver,
// since even without going through shotover the cpp driver fails this test.
#[allow(irrefutable_let_patterns)]
Expand Down Expand Up @@ -816,7 +884,7 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
.await;
tokio::time::sleep(Duration::from_secs(10)).await;
produce_consume_partitions1(connection_builder, "partitions1_rf3").await;
produce_consume_partitions3(connection_builder, "partitions3_rf3").await;
produce_consume_partitions3(connection_builder, "partitions3_rf3", 1, 500).await;
}

pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) {
Expand Down
12 changes: 8 additions & 4 deletions test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, HashSet};
// Allow direct usage of the APIs when the feature is enabled
pub use rdkafka;

use super::{ExpectedResponse, NewPartition, Record, TopicPartition};
use super::{ConsumerConfig, ExpectedResponse, NewPartition, Record, TopicPartition};
use anyhow::Result;
use pretty_assertions::assert_eq;
use rdkafka::admin::AdminClient;
Expand Down Expand Up @@ -63,17 +63,21 @@ impl KafkaConnectionBuilderCpp {
}
}

pub async fn connect_consumer(&self, topic_name: &str, group: &str) -> KafkaConsumerCpp {
pub async fn connect_consumer(&self, config: ConsumerConfig) -> KafkaConsumerCpp {
let consumer: StreamConsumer = self
.client
.clone()
.set("group.id", group)
.set("group.id", &config.group)
.set("session.timeout.ms", "6000")
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "false")
// this has a different name to the java driver 😭
.set("fetch.wait.max.ms", config.fetch_max_wait_ms.to_string())
.set("fetch.min.bytes", config.fetch_min_bytes.to_string())
.create()
.unwrap();
consumer.subscribe(&[topic_name]).unwrap();

consumer.subscribe(&[&config.topic_name]).unwrap();
KafkaConsumerCpp { consumer }
}

Expand Down
24 changes: 17 additions & 7 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{
Acl, AclOperation, AclPermissionType, AlterConfig, ExpectedResponse, NewPartition, NewTopic,
Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition,
Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse,
NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicDescription, TopicPartition,
};
use crate::connection::java::{Jvm, Value};
use anyhow::Result;
Expand Down Expand Up @@ -102,12 +103,20 @@ impl KafkaConnectionBuilderJava {
KafkaProducerJava { jvm, producer }
}

pub async fn connect_consumer(&self, topic_name: &str, group: &str) -> KafkaConsumerJava {
pub async fn connect_consumer(&self, consumer_config: ConsumerConfig) -> KafkaConsumerJava {
let mut config = self.base_config.clone();
config.insert("group.id".to_owned(), group.to_owned());
config.insert("group.id".to_owned(), consumer_config.group);
config.insert("session.timeout.ms".to_owned(), "6000".to_owned());
config.insert("auto.offset.reset".to_owned(), "earliest".to_owned());
config.insert("enable.auto.commit".to_owned(), "false".to_owned());
config.insert(
"fetch.max.wait.ms".to_owned(),
consumer_config.fetch_max_wait_ms.to_string(),
);
config.insert(
"fetch.min.bytes".to_owned(),
consumer_config.fetch_min_bytes.to_string(),
);
config.insert(
"key.deserializer".to_owned(),
"org.apache.kafka.common.serialization.StringDeserializer".to_owned(),
Expand All @@ -123,9 +132,10 @@ impl KafkaConnectionBuilderJava {
);
consumer.call(
"subscribe",
vec![self
.jvm
.new_list("java.lang.String", vec![self.jvm.new_string(topic_name)])],
vec![self.jvm.new_list(
"java.lang.String",
vec![self.jvm.new_string(&consumer_config.topic_name)],
)],
);

let jvm = self.jvm.clone();
Expand Down
40 changes: 37 additions & 3 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ impl KafkaConnectionBuilder {
}
}

pub async fn connect_consumer(&self, topic_name: &str, group: &str) -> KafkaConsumer {
pub async fn connect_consumer(&self, config: ConsumerConfig) -> KafkaConsumer {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(cpp) => KafkaConsumer::Cpp(cpp.connect_consumer(topic_name, group).await),
Self::Java(java) => KafkaConsumer::Java(java.connect_consumer(topic_name, group).await),
Self::Cpp(cpp) => KafkaConsumer::Cpp(cpp.connect_consumer(config).await),
Self::Java(java) => KafkaConsumer::Java(java.connect_consumer(config).await),
}
}

Expand Down Expand Up @@ -398,3 +398,37 @@ pub struct TopicDescription {
// instead they just check if the describe succeeded or failed,
// so this is intentionally left empty for now
}

#[derive(Default)]
pub struct ConsumerConfig {
topic_name: String,
group: String,
fetch_min_bytes: i32,
fetch_max_wait_ms: i32,
}

impl ConsumerConfig {
pub fn consume_from_topic(topic_name: String) -> Self {
Self {
topic_name,
group: "default_group".to_owned(),
fetch_min_bytes: 1,
fetch_max_wait_ms: 500,
}
}

pub fn with_group(mut self, group: &str) -> Self {
self.group = group.to_owned();
self
}

pub fn with_fetch_min_bytes(mut self, fetch_min_bytes: i32) -> Self {
self.fetch_min_bytes = fetch_min_bytes;
self
}

pub fn with_fetch_max_wait_ms(mut self, fetch_max_wait_ms: i32) -> Self {
self.fetch_max_wait_ms = fetch_max_wait_ms;
self
}
}
Loading