From 4dbf9f6ec05c71db395c66cbe628351757a1f26a Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 19 Apr 2024 07:35:45 +1000 Subject: [PATCH] Fix kafka int tests with 3 partitions to actually use 3 partitions (#1578) --- shotover-proxy/tests/kafka_int_tests/mod.rs | 22 +- .../tests/kafka_int_tests/test_cases.rs | 205 ++++++++++++++---- test-helpers/src/connection/kafka/cpp.rs | 21 +- test-helpers/src/connection/kafka/java.rs | 27 ++- test-helpers/src/connection/kafka/mod.rs | 61 +++++- 5 files changed, 246 insertions(+), 90 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 1f46c35d0..78b7dfe2d 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -18,7 +18,7 @@ async fn passthrough_standard(#[case] driver: KafkaDriver) { .await; let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::basic(connection_builder).await; + test_cases::standard_test_suite(connection_builder).await; tokio::time::timeout( Duration::from_secs(10), @@ -42,7 +42,7 @@ async fn passthrough_tls(#[case] driver: KafkaDriver) { .await; let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::basic(connection_builder).await; + test_cases::standard_test_suite(connection_builder).await; tokio::time::timeout( Duration::from_secs(10), @@ -66,7 +66,7 @@ async fn cluster_tls(#[case] driver: KafkaDriver) { .await; let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::basic(connection_builder).await; + test_cases::cluster_test_suite(connection_builder).await; tokio::time::timeout( Duration::from_secs(10), @@ -89,7 +89,7 @@ async fn passthrough_encode(#[case] driver: KafkaDriver) { .await; let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::basic(connection_builder).await; + test_cases::standard_test_suite(connection_builder).await; shotover.shutdown_and_then_consume_events(&[]).await; } @@ -107,7 +107,7 @@ async fn passthrough_sasl(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password"); - test_cases::basic(connection_builder).await; + test_cases::standard_test_suite(connection_builder).await; shotover.shutdown_and_then_consume_events(&[]).await; } @@ -127,7 +127,7 @@ async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password"); - test_cases::basic(connection_builder).await; + test_cases::standard_test_suite(connection_builder).await; shotover.shutdown_and_then_consume_events(&[]).await; } @@ -144,7 +144,7 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) { .await; let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::basic(connection_builder).await; + test_cases::cluster_test_suite(connection_builder).await; tokio::time::timeout( Duration::from_secs(10), @@ -178,7 +178,7 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) { } let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::basic(connection_builder).await; + test_cases::cluster_test_suite(connection_builder).await; for shotover in shotovers { tokio::time::timeout( @@ -216,7 +216,7 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { } let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); - test_cases::basic(connection_builder).await; + test_cases::cluster_test_suite(connection_builder).await; for shotover in shotovers { tokio::time::timeout( @@ -242,7 +242,7 @@ async fn cluster_sasl_single_shotover(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password"); - test_cases::basic(connection_builder).await; + test_cases::cluster_test_suite(connection_builder).await; tokio::time::timeout( Duration::from_secs(10), @@ -277,7 +277,7 @@ async fn cluster_sasl_multi_shotover(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password"); - test_cases::basic(connection_builder).await; + test_cases::cluster_test_suite(connection_builder).await; for shotover in shotovers { tokio::time::timeout( diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 0a9436be7..4a944b3d3 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -13,7 +13,7 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { replication_factor: 1, }, NewTopic { - name: "paritions3", + name: "partitions3", num_partitions: 3, replication_factor: 1, }, @@ -32,7 +32,6 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { admin .create_partitions(&[NewPartition { - // TODO: modify topic "foo" instead so that we can test our handling of that with interesting partition + replication count topic_name: "to_delete", new_partition_count: 2, }]) @@ -57,48 +56,152 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { admin.delete_topics(&["to_delete"]).await } -async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name: &str, i: i64) { - let producer = connection_builder.connect_producer(1).await; +async fn produce_consume_partitions1( + connection_builder: &KafkaConnectionBuilder, + topic_name: &str, +) { + { + let producer = connection_builder.connect_producer(1).await; + // create an initial record to force kafka to create the topic if it doesnt yet exist + producer + .assert_produce( + Record { + payload: "initial", + topic_name, + key: Some("Key"), + }, + Some(0), + ) + .await; - producer - .assert_produce( - Record { - payload: "Message1", - topic_name, - key: Some("Key"), - }, - Some(i * 2), - ) - .await; - producer - .assert_produce( - Record { - payload: "Message2", - topic_name, - key: None, - }, - Some(i * 2 + 1), - ) - .await; + let mut consumer = connection_builder.connect_consumer(topic_name).await; + consumer + .assert_consume(ExpectedResponse { + message: "initial".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(0), + }) + .await; - let mut consumer = connection_builder.connect_consumer(topic_name).await; + // create and consume records + for i in 0..5 { + producer + .assert_produce( + Record { + payload: "Message1", + topic_name, + key: Some("Key"), + }, + Some(i * 2 + 1), + ) + .await; + producer + .assert_produce( + Record { + payload: "Message2", + topic_name, + key: None, + }, + Some(i * 2 + 2), + ) + .await; + consumer + .assert_consume(ExpectedResponse { + message: "Message1".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 1), + }) + .await; + consumer + .assert_consume(ExpectedResponse { + message: "Message2".to_owned(), + key: None, + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 2), + }) + .await; + } + } + + // if we create a new consumer it will start from the begginning since `auto.offset.reset = earliest` + // so we test that we can access all records ever created on this topic + let mut consumer = connection_builder.connect_consumer(topic_name).await; consumer .assert_consume(ExpectedResponse { - message: "Message1", - key: Some("Key"), - topic_name, - offset: 0, - }) - .await; - consumer - .assert_consume(ExpectedResponse { - message: "Message2", - key: None, - topic_name, - offset: 1, + message: "initial".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(0), }) .await; + for i in 0..5 { + consumer + .assert_consume(ExpectedResponse { + message: "Message1".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 1), + }) + .await; + consumer + .assert_consume(ExpectedResponse { + message: "Message2".to_owned(), + key: None, + topic_name: topic_name.to_owned(), + offset: Some(i * 2 + 2), + }) + .await; + } +} + +async fn produce_consume_partitions3(connection_builder: &KafkaConnectionBuilder) { + let topic_name = "partitions3"; + let producer = connection_builder.connect_producer(1).await; + let mut consumer = connection_builder.connect_consumer(topic_name).await; + + for _ in 0..5 { + producer + .assert_produce( + Record { + payload: "Message1", + topic_name, + key: Some("Key"), + }, + // We cant predict the offsets since that will depend on which partition the keyless record ends up in + None, + ) + .await; + producer + .assert_produce( + Record { + payload: "Message2", + topic_name, + key: None, + }, + None, + ) + .await; + + consumer + .assert_consume_in_any_order(vec![ + ExpectedResponse { + message: "Message1".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: None, + }, + ExpectedResponse { + message: "Message2".to_owned(), + key: None, + topic_name: topic_name.to_owned(), + offset: None, + }, + ]) + .await; + } } async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { @@ -123,21 +226,29 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { for j in 0..10 { consumer .assert_consume(ExpectedResponse { - message: "MessageAcks0", - key: Some("KeyAcks0"), - topic_name, - offset: j, + message: "MessageAcks0".to_owned(), + key: Some("KeyAcks0".to_owned()), + topic_name: topic_name.to_owned(), + offset: Some(j), }) .await; } } -pub async fn basic(connection_builder: KafkaConnectionBuilder) { +pub async fn standard_test_suite(connection_builder: KafkaConnectionBuilder) { admin_setup(&connection_builder).await; - for i in 0..2 { - produce_consume(&connection_builder, "partitions1", i).await; - produce_consume(&connection_builder, "partitions3", i).await; - produce_consume_acks0(&connection_builder).await; - } + produce_consume_partitions1(&connection_builder, "partitions1").await; + produce_consume_partitions1(&connection_builder, "unknown_topic").await; + produce_consume_partitions3(&connection_builder).await; + produce_consume_acks0(&connection_builder).await; + connection_builder.admin_cleanup().await; +} + +// TODO: make cluster's run standard_test_suite instead +pub async fn cluster_test_suite(connection_builder: KafkaConnectionBuilder) { + admin_setup(&connection_builder).await; + produce_consume_partitions1(&connection_builder, "partitions1").await; + produce_consume_partitions1(&connection_builder, "unknown_topic").await; + produce_consume_acks0(&connection_builder).await; connection_builder.admin_cleanup().await; } diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index 40770b5ea..ed96cb721 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -133,25 +133,24 @@ pub struct KafkaConsumerCpp { } impl KafkaConsumerCpp { - pub async fn assert_consume(&self, response: ExpectedResponse<'_>) { + pub async fn consume(&self) -> ExpectedResponse { let message = tokio::time::timeout(Duration::from_secs(30), self.consumer.recv()) .await .expect("Timeout while receiving from consumer") .unwrap(); - let contents = message.payload_view::().unwrap().unwrap(); - assert_eq!(response.message, contents); - assert_eq!( - response.key, - message.key().map(|x| std::str::from_utf8(x).unwrap()) - ); - assert_eq!(response.topic_name, message.topic()); - assert_eq!(response.offset, message.offset()); + ExpectedResponse { + message: message.payload_view::().unwrap().unwrap().to_owned(), + key: message + .key() + .map(|x| String::from_utf8(x.to_vec()).unwrap()), + topic_name: message.topic().to_owned(), + offset: Some(message.offset()), + } } } pub struct KafkaAdminCpp { - // TODO: make private - pub admin: AdminClient, + admin: AdminClient, } impl KafkaAdminCpp { diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index f1e454633..b5a8e78d1 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -199,16 +199,16 @@ pub struct KafkaConsumerJava { } impl KafkaConsumerJava { - pub async fn assert_consume(&mut self, expected_response: ExpectedResponse<'_>) { + pub async fn consume(&mut self) -> ExpectedResponse { // This method asserts that we have consumed a single record from the broker. // Internally we may have actually received multiple records from the broker. - // But that is hidden from the test by storing any extra messages for use in the next call to `assert_consume` + // But that is hidden from the test by storing any extra messages for use in the next call to `consume` if self.waiting_records.is_empty() { self.fetch_from_broker(); } - self.process_one_fetched_record(expected_response); + self.pop_one_record() } fn fetch_from_broker(&mut self) { @@ -240,14 +240,14 @@ impl KafkaConsumerJava { .jvm .cast(&record, "org.apache.kafka.clients.consumer.ConsumerRecord") .unwrap(); - self.waiting_records.push_front(record); + self.waiting_records.push_back(record); } } - fn process_one_fetched_record(&mut self, expected_response: ExpectedResponse<'_>) { + fn pop_one_record(&mut self) -> ExpectedResponse { let record = self .waiting_records - .pop_back() + .pop_front() .expect("KafkaConsumer.poll timed out"); let offset: i64 = self @@ -258,9 +258,8 @@ impl KafkaConsumerJava { .unwrap() .to_rust() .unwrap(); - assert_eq!(expected_response.offset, offset); - let topic: String = self + let topic_name: String = self .jvm .chain(&record) .unwrap() @@ -268,9 +267,8 @@ impl KafkaConsumerJava { .unwrap() .to_rust() .unwrap(); - assert_eq!(expected_response.topic_name, topic); - let value: String = self + let message: String = self .jvm .chain(&record) .unwrap() @@ -278,7 +276,6 @@ impl KafkaConsumerJava { .unwrap() .to_rust() .unwrap(); - assert_eq!(expected_response.message, value); let key: Option = self .jvm @@ -288,7 +285,13 @@ impl KafkaConsumerJava { .unwrap() .to_rust() .unwrap(); - assert_eq!(expected_response.key, key.as_deref()); + + ExpectedResponse { + message, + key, + topic_name, + offset: Some(offset), + } } } diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index bbce3e1dc..9e3549443 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -97,20 +97,63 @@ pub enum KafkaConsumer { } impl KafkaConsumer { - pub async fn assert_consume(&mut self, response: ExpectedResponse<'_>) { - match self { + pub async fn assert_consume(&mut self, expected_response: ExpectedResponse) { + let response = match self { #[cfg(feature = "rdkafka-driver-tests")] - Self::Cpp(cpp) => cpp.assert_consume(response).await, - Self::Java(java) => java.assert_consume(response).await, + Self::Cpp(cpp) => cpp.consume().await, + Self::Java(java) => java.consume().await, + }; + assert_eq!(expected_response.message, response.message); + assert_eq!(expected_response.key, response.key); + assert_eq!(expected_response.topic_name, response.topic_name); + assert_eq!(expected_response.offset, response.offset); + } + + pub async fn assert_consume_in_any_order(&mut self, expected_responses: Vec) { + let mut responses = vec![]; + while responses.len() < expected_responses.len() { + match self { + #[cfg(feature = "rdkafka-driver-tests")] + Self::Cpp(cpp) => responses.push(cpp.consume().await), + Self::Java(java) => responses.push(java.consume().await), + } + } + let full_responses = responses.clone(); + let full_expected_responses = expected_responses.clone(); + + for expected_response in expected_responses { + match responses.iter().enumerate().find(|(_, x)| **x == expected_response) { + Some((i, _)) => { + responses.remove(i); + } + None => panic!("An expected response was not found in the actual responses\nExpected responses:{full_expected_responses:#?}\nActual responses:{full_responses:#?}"), + } } } } -pub struct ExpectedResponse<'a> { - pub message: &'a str, - pub key: Option<&'a str>, - pub topic_name: &'a str, - pub offset: i64, +#[derive(Debug, Clone)] +pub struct ExpectedResponse { + pub message: String, + pub key: Option, + pub topic_name: String, + /// Responses will always have this set to Some. + /// The test case can set this to None to ignore the value of the actual response. + pub offset: Option, +} + +impl PartialEq for ExpectedResponse { + fn eq(&self, other: &Self) -> bool { + self.message == other.message + && self.key == other.key + && self.topic_name == other.topic_name + && match (self.offset, other.offset) { + (None, None) => true, + (None, Some(_)) => true, + (Some(_), None) => true, + (Some(a), Some(b)) => a == b, + } + } } pub enum KafkaAdmin {