Skip to content

Commit

Permalink
Merge branch 'main' into cassandra_sink_cluster_port_to_connection
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Apr 18, 2024
2 parents fc1df55 + 4dbf9f6 commit dff9c7e
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 90 deletions.
22 changes: 11 additions & 11 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn passthrough_standard(#[case] driver: KafkaDriver) {
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;
test_cases::standard_test_suite(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand All @@ -42,7 +42,7 @@ async fn passthrough_tls(#[case] driver: KafkaDriver) {
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;
test_cases::standard_test_suite(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand All @@ -66,7 +66,7 @@ async fn cluster_tls(#[case] driver: KafkaDriver) {
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;
test_cases::cluster_test_suite(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand All @@ -89,7 +89,7 @@ async fn passthrough_encode(#[case] driver: KafkaDriver) {
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;
test_cases::standard_test_suite(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}
Expand All @@ -107,7 +107,7 @@ async fn passthrough_sasl(#[case] driver: KafkaDriver) {

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;
test_cases::standard_test_suite(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}
Expand All @@ -127,7 +127,7 @@ async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) {

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;
test_cases::standard_test_suite(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}
Expand All @@ -144,7 +144,7 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
.await;

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;
test_cases::cluster_test_suite(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand Down Expand Up @@ -178,7 +178,7 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;
test_cases::cluster_test_suite(connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Expand Down Expand Up @@ -216,7 +216,7 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;
test_cases::cluster_test_suite(connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Expand All @@ -242,7 +242,7 @@ async fn cluster_sasl_single_shotover(#[case] driver: KafkaDriver) {

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;
test_cases::cluster_test_suite(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
Expand Down Expand Up @@ -277,7 +277,7 @@ async fn cluster_sasl_multi_shotover(#[case] driver: KafkaDriver) {

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;
test_cases::cluster_test_suite(connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Expand Down
205 changes: 158 additions & 47 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
replication_factor: 1,
},
NewTopic {
name: "paritions3",
name: "partitions3",
num_partitions: 3,
replication_factor: 1,
},
Expand All @@ -32,7 +32,6 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {

admin
.create_partitions(&[NewPartition {
// TODO: modify topic "foo" instead so that we can test our handling of that with interesting partition + replication count
topic_name: "to_delete",
new_partition_count: 2,
}])
Expand All @@ -57,48 +56,152 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
admin.delete_topics(&["to_delete"]).await
}

async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name: &str, i: i64) {
let producer = connection_builder.connect_producer(1).await;
async fn produce_consume_partitions1(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
) {
{
let producer = connection_builder.connect_producer(1).await;
// create an initial record to force kafka to create the topic if it doesnt yet exist
producer
.assert_produce(
Record {
payload: "initial",
topic_name,
key: Some("Key"),
},
Some(0),
)
.await;

producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
},
Some(i * 2),
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: None,
},
Some(i * 2 + 1),
)
.await;
let mut consumer = connection_builder.connect_consumer(topic_name).await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(0),
})
.await;

let mut consumer = connection_builder.connect_consumer(topic_name).await;
// create and consume records
for i in 0..5 {
producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
},
Some(i * 2 + 1),
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: None,
},
Some(i * 2 + 2),
)
.await;

consumer
.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 1),
})
.await;
consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 2),
})
.await;
}
}

// if we create a new consumer it will start from the begginning since `auto.offset.reset = earliest`
// so we test that we can access all records ever created on this topic
let mut consumer = connection_builder.connect_consumer(topic_name).await;
consumer
.assert_consume(ExpectedResponse {
message: "Message1",
key: Some("Key"),
topic_name,
offset: 0,
})
.await;
consumer
.assert_consume(ExpectedResponse {
message: "Message2",
key: None,
topic_name,
offset: 1,
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(0),
})
.await;
for i in 0..5 {
consumer
.assert_consume(ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 1),
})
.await;
consumer
.assert_consume(ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: Some(i * 2 + 2),
})
.await;
}
}

async fn produce_consume_partitions3(connection_builder: &KafkaConnectionBuilder) {
let topic_name = "partitions3";
let producer = connection_builder.connect_producer(1).await;
let mut consumer = connection_builder.connect_consumer(topic_name).await;

for _ in 0..5 {
producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
},
// We cant predict the offsets since that will depend on which partition the keyless record ends up in
None,
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: None,
},
None,
)
.await;

consumer
.assert_consume_in_any_order(vec![
ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: None,
},
ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: None,
},
])
.await;
}
}

async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
Expand All @@ -123,21 +226,29 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
for j in 0..10 {
consumer
.assert_consume(ExpectedResponse {
message: "MessageAcks0",
key: Some("KeyAcks0"),
topic_name,
offset: j,
message: "MessageAcks0".to_owned(),
key: Some("KeyAcks0".to_owned()),
topic_name: topic_name.to_owned(),
offset: Some(j),
})
.await;
}
}

pub async fn basic(connection_builder: KafkaConnectionBuilder) {
pub async fn standard_test_suite(connection_builder: KafkaConnectionBuilder) {
admin_setup(&connection_builder).await;
for i in 0..2 {
produce_consume(&connection_builder, "partitions1", i).await;
produce_consume(&connection_builder, "partitions3", i).await;
produce_consume_acks0(&connection_builder).await;
}
produce_consume_partitions1(&connection_builder, "partitions1").await;
produce_consume_partitions1(&connection_builder, "unknown_topic").await;
produce_consume_partitions3(&connection_builder).await;
produce_consume_acks0(&connection_builder).await;
connection_builder.admin_cleanup().await;
}

// TODO: make cluster's run standard_test_suite instead
pub async fn cluster_test_suite(connection_builder: KafkaConnectionBuilder) {
admin_setup(&connection_builder).await;
produce_consume_partitions1(&connection_builder, "partitions1").await;
produce_consume_partitions1(&connection_builder, "unknown_topic").await;
produce_consume_acks0(&connection_builder).await;
connection_builder.admin_cleanup().await;
}
21 changes: 10 additions & 11 deletions test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,24 @@ pub struct KafkaConsumerCpp {
}

impl KafkaConsumerCpp {
pub async fn assert_consume(&self, response: ExpectedResponse<'_>) {
pub async fn consume(&self) -> ExpectedResponse {
let message = tokio::time::timeout(Duration::from_secs(30), self.consumer.recv())
.await
.expect("Timeout while receiving from consumer")
.unwrap();
let contents = message.payload_view::<str>().unwrap().unwrap();
assert_eq!(response.message, contents);
assert_eq!(
response.key,
message.key().map(|x| std::str::from_utf8(x).unwrap())
);
assert_eq!(response.topic_name, message.topic());
assert_eq!(response.offset, message.offset());
ExpectedResponse {
message: message.payload_view::<str>().unwrap().unwrap().to_owned(),
key: message
.key()
.map(|x| String::from_utf8(x.to_vec()).unwrap()),
topic_name: message.topic().to_owned(),
offset: Some(message.offset()),
}
}
}

pub struct KafkaAdminCpp {
// TODO: make private
pub admin: AdminClient<DefaultClientContext>,
admin: AdminClient<DefaultClientContext>,
}

impl KafkaAdminCpp {
Expand Down
Loading

0 comments on commit dff9c7e

Please sign in to comment.