Skip to content

Commit

Permalink
Add java kafka driver backend to kafka integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 1, 2024
1 parent 384dc38 commit 1b03696
Show file tree
Hide file tree
Showing 12 changed files with 678 additions and 311 deletions.
1 change: 1 addition & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[profile.default]
fail-fast = false
slow-timeout = { period = '5m', terminate-after = 2 }
archive-include = ["debug/jassets", "release/jassets"]

# Overwrites profile.default when the filter matches
[[profile.default.overrides]]
Expand Down
10 changes: 6 additions & 4 deletions .github/workflows/build_and_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ jobs:
key: ubuntu-20.04-packages
- name: Install ubuntu packages
run: shotover-proxy/build/install_ubuntu_packages.sh
- name: Install nextest
uses: taiki-e/install-action@v2
with:
tool: nextest@0.9.57
#- name: Install nextest
# uses: taiki-e/install-action@v2
# with:
# tool: nextest@0.9.57
#- run: cargo install --git https://github.com/rukai/nextest --rev 450e73c86f4556e7e2aae812e4feaf1728967dd7 cargo-nextest
- run: cargo install --git https://github.com/rukai/nextest --branch archive-include cargo-nextest
- name: Build tests
run: |
cargo test --doc ${{ matrix.cargo_flags }} --all-features -- --show-output --nocapture
Expand Down
68 changes: 68 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig;
use shotover::transforms::TransformConfig;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use test_helpers::connection::kafka::rdkafka::admin::{
use test_helpers::connection::kafka::cpp::rdkafka::admin::{
AdminClient, AdminOptions, NewTopic, TopicReplication,
};
use test_helpers::connection::kafka::rdkafka::client::DefaultClientContext;
use test_helpers::connection::kafka::rdkafka::config::ClientConfig;
use test_helpers::connection::kafka::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::connection::kafka::rdkafka::producer::{FutureProducer, FutureRecord};
use test_helpers::connection::kafka::rdkafka::util::Timeout;
use test_helpers::connection::kafka::cpp::rdkafka::client::DefaultClientContext;
use test_helpers::connection::kafka::cpp::rdkafka::config::ClientConfig;
use test_helpers::connection::kafka::cpp::rdkafka::consumer::{Consumer, StreamConsumer};
use test_helpers::connection::kafka::cpp::rdkafka::producer::{FutureProducer, FutureRecord};
use test_helpers::connection::kafka::cpp::rdkafka::util::Timeout;
use test_helpers::docker_compose::docker_compose;
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle, time::Instant};
use windsock::{Bench, BenchParameters, Profiling, Report};
Expand Down
95 changes: 55 additions & 40 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
#[cfg(feature = "rdkafka-driver-tests")]
mod test_cases;

use crate::shotover_process;
#[cfg(feature = "rdkafka-driver-tests")]
use rstest::rstest;
use std::time::Duration;
#[cfg(feature = "rdkafka-driver-tests")]
use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver};
use test_helpers::docker_compose::docker_compose;

#[cfg(feature = "rdkafka-driver-tests")]
mod test_cases;

#[cfg(feature = "rdkafka-driver-tests")]
use test_helpers::connection::kafka::KafkaConnectionBuilder;

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn passthrough_standard() {
async fn passthrough_standard(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Expand All @@ -31,9 +28,11 @@ async fn passthrough_standard() {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn passthrough_tls() {
async fn passthrough_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
Expand All @@ -42,7 +41,7 @@ async fn passthrough_tls() {
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Expand All @@ -53,9 +52,11 @@ async fn passthrough_tls() {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_tls() {
async fn cluster_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
Expand All @@ -64,7 +65,7 @@ async fn cluster_tls() {
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Expand All @@ -75,40 +76,46 @@ async fn cluster_tls() {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn passthrough_encode() {
async fn passthrough_encode(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough/topology-encode.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn passthrough_sasl() {
async fn passthrough_sasl(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/passthrough-sasl/topology.yaml")
.start()
.await;

let connection_builder =
KafkaConnectionBuilder::new("127.0.0.1:9192").use_sasl("user", "password");
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn passthrough_sasl_encode() {
async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/passthrough-sasl/docker-compose.yaml");
let shotover =
Expand All @@ -117,22 +124,24 @@ async fn passthrough_sasl_encode() {
.await;

let connection_builder =
KafkaConnectionBuilder::new("127.0.0.1:9192").use_sasl("user", "password");
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl("user", "password");
test_cases::basic(connection_builder).await;

shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_1_rack_single_shotover() {
async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
let shotover = shotover_process("tests/test-configs/kafka/cluster-1-rack/topology-single.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Expand All @@ -143,9 +152,11 @@ async fn cluster_1_rack_single_shotover() {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn cluster_1_rack_multi_shotover() {
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")]
async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
let mut shotovers = vec![];
Expand All @@ -163,7 +174,7 @@ async fn cluster_1_rack_multi_shotover() {
);
}

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

for shotover in shotovers {
Expand All @@ -176,17 +187,19 @@ async fn cluster_1_rack_multi_shotover() {
}
}

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_2_racks_single_shotover() {
async fn cluster_2_racks_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");
let shotover =
shotover_process("tests/test-configs/kafka/cluster-2-racks/topology-single.yaml")
.start()
.await;

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

tokio::time::timeout(
Expand All @@ -197,9 +210,11 @@ async fn cluster_2_racks_single_shotover() {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test]
async fn cluster_2_racks_multi_shotover() {
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");

Expand All @@ -219,7 +234,7 @@ async fn cluster_2_racks_multi_shotover() {
);
}

let connection_builder = KafkaConnectionBuilder::new("127.0.0.1:9192");
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::basic(connection_builder).await;

for shotover in shotovers {
Expand Down
Loading

0 comments on commit 1b03696

Please sign in to comment.