Skip to content

Commit

Permalink
feat: Add support for official apache kafka docker image
Browse files Browse the repository at this point in the history
Up till now kafka test container used `confluentinc/cp-kafka`,
recently kafka started producing two official
images `apache\kafka` and `apache\kafka-native`
which may use `kraft` protocol, thus no zookeeper
instance is needed

This commit exposes test container which utilizes
those new containers, with `kraft` protocol enabled.

Default image is `apache\kafka-native` which provide
fast starting speed, low memory utilization and
smaller docker image.

More details:

- [KIP-974: Docker Image for GraalVM based Native Kafka Broker](https://cwiki.apache.org/confluence/display/KAFKA/KIP-974%3A+Docker+Image+for+GraalVM+based+Native+Kafka+Broker)
- [KIP-975: Docker Image for Apache Kafka](https://cwiki.apache.org/confluence/display/KAFKA/KIP-975%3A+Docker+Image+for+Apache+Kafka)
- [Java testcontainer implementation](https://github.com/testcontainers/testcontainers-java/blob/1.20.1/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java#)
  • Loading branch information
milenkovicm committed Aug 21, 2024
1 parent 535e648 commit 1d2d0af
Show file tree
Hide file tree
Showing 4 changed files with 569 additions and 212 deletions.
12 changes: 10 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ postgres = "0.19.7"
pretty_env_logger = "0.5.0"
rdkafka = "0.36.0"
redis = { version = "0.26.0", features = ["json"] }
reqwest = { version = "0.12.5", features = ["blocking", "json", "rustls-tls", "rustls-tls-native-roots"], default-features = false }
reqwest = { version = "0.12.5", features = [
"blocking",
"json",
"rustls-tls",
"rustls-tls-native-roots",
], default-features = false }
retry = "2.0.0"
rustls = { version = "0.23.2", features = ["ring"] }
serde = { version = "1.0.188", features = ["derive"] }
Expand All @@ -92,7 +97,10 @@ tiberius = { version = "0.12.2", default-features = false, features = [
tokio = { version = "1", features = ["macros"] }
tokio-util = { version = "0.7.10", features = ["compat"] }
zookeeper-client = { version = "0.8.0" }
kube = { version = "0.90.0", default-features = false, features = ["client", "rustls-tls"] }
kube = { version = "0.90.0", default-features = false, features = [
"client",
"rustls-tls",
] }
k8s-openapi = { version = "0.21.1", features = ["v1_29"] }
clickhouse = "0.11.6"
vaultrs = "0.7.2"
Expand Down
319 changes: 319 additions & 0 deletions src/kafka/apache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
use std::{borrow::Cow, collections::HashMap};
use testcontainers::{
core::{ContainerPort, ContainerState, ExecCommand, WaitFor},
Image,
};

const KAFKA_NATIVE_IMAGE_NAME: &str = "apache/kafka-native";
const KAFKA_IMAGE_NAME: &str = "apache/kafka";
const TAG: &str = "latest";

/// Port that [`Apache Kafka`] uses internally.
/// Can be rebound externally via [`testcontainers::core::ImageExt::with_mapped_port`]
///
/// [`Apache Kafka`]: https://kafka.apache.org/
pub const KAFKA_PORT: ContainerPort = ContainerPort::Tcp(9092);

const START_SCRIPT: &str = "/opt/kafka/testcontainers_start.sh";
const DEFAULT_INTERNAL_TOPIC_RF: usize = 1;
const DEFAULT_CLUSTER_ID: &str = "5L6g3nShT-eMCtK--X86sw";
const DEFAULT_BROKER_ID: usize = 1;

/// Module to work with [`Apache Kafka`] broker
///
/// Starts an instance of Apache Kafka broker, with Apache Kafka Raft (KRaft) is the consensus protocol
/// enabled.
///
/// This module is based on the official [`Apache Kafka docker image`](https://hub.docker.com/r/apache/kafka)
///
/// Module comes in two flavours:
///
/// - [`Apache Kafka GraalVM docker image`](https://hub.docker.com/r/apache/kafka-native), which is default as it provides faster startup and lower memory consumption.
/// - [`Apache Kafka JVM docker image`](https://hub.docker.com/r/apache/kafka)
///
/// # Example
/// ```
/// use testcontainers_modules::{kafka::apache, testcontainers::runners::SyncRunner};
/// let kafka_node = apache::Kafka::default().start().unwrap();
/// // connect to kafka server to send/receive messages
/// ```
///
/// [`Apache Kafka`]: https://kafka.apache.org/
#[derive(Debug, Clone)]
pub struct Kafka {
env_vars: HashMap<String, String>,
image_name: String,
}

impl Default for Kafka {
fn default() -> Self {
let mut env_vars = HashMap::new();
env_vars.insert(
"KAFKA_LISTENERS".to_owned(),
format!(
"PLAINTEXT://0.0.0.0:{},BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094",
KAFKA_PORT.as_u16()
),
);
env_vars.insert("CLUSTER_ID".to_owned(), DEFAULT_CLUSTER_ID.to_owned());
env_vars.insert(
"KAFKA_PROCESS_ROLES".to_owned(),
"broker,controller".to_owned(),
);

env_vars.insert(
"KAFKA_CONTROLLER_LISTENER_NAMES".to_owned(),
"CONTROLLER".to_owned(),
);
env_vars.insert(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP".to_owned(),
"BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT".to_owned(),
);
env_vars.insert(
"KAFKA_INTER_BROKER_LISTENER_NAME".to_owned(),
"BROKER".to_owned(),
);
env_vars.insert(
"KAFKA_ADVERTISED_LISTENERS".to_owned(),
format!(
"PLAINTEXT://localhost:{},BROKER://localhost:9092",
KAFKA_PORT.as_u16()
),
);
env_vars.insert("KAFKA_BROKER_ID".to_owned(), DEFAULT_BROKER_ID.to_string());
env_vars.insert(
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR".to_owned(),
DEFAULT_INTERNAL_TOPIC_RF.to_string(),
);
env_vars.insert(
"KAFKA_CONTROLLER_QUORUM_VOTERS".to_owned(),
format!("{DEFAULT_BROKER_ID}@localhost:9094").to_owned(),
);

Self {
env_vars,
image_name: KAFKA_NATIVE_IMAGE_NAME.to_string(),
}
}
}

impl Kafka {
/// Switches default image to `apache/kafka` instead of `apache/kafka-native`
pub fn with_jvm_image(mut self) -> Self {
self.image_name = KAFKA_IMAGE_NAME.to_string();

self
}
}

impl Image for Kafka {
fn name(&self) -> &str {
self.image_name.as_str()
}

fn tag(&self) -> &str {
TAG
}

fn ready_conditions(&self) -> Vec<WaitFor> {
vec![]
}

fn entrypoint(&self) -> Option<&str> {
Some("bash")
}

fn env_vars(
&self,
) -> impl IntoIterator<Item = (impl Into<Cow<'_, str>>, impl Into<Cow<'_, str>>)> {
&self.env_vars
}

fn cmd(&self) -> impl IntoIterator<Item = impl Into<Cow<'_, str>>> {
vec![
"-c".to_string(),
format!("while [ ! -f {START_SCRIPT} ]; do sleep 0.1; done; pwd &&chmod 755 {START_SCRIPT} && {START_SCRIPT}"),
]
.into_iter()
}

fn expose_ports(&self) -> &[ContainerPort] {
&[KAFKA_PORT]
}

fn exec_after_start(
&self,
cs: ContainerState,
) -> Result<Vec<ExecCommand>, testcontainers::TestcontainersError> {
let mut commands = vec![];

let cmd = vec![
"sh".to_string(),
"-c".to_string(),
format!(
"echo '#!/usr/bin/env bash\nexport KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:{},BROKER://localhost:9093\n/etc/kafka/docker/run \n' > {}",
cs.host_port_ipv4(KAFKA_PORT)?,
START_SCRIPT
),
];
let ready_conditions = vec![WaitFor::message_on_stdout("Kafka Server started")];
commands.push(ExecCommand::new(cmd).with_container_ready_conditions(ready_conditions));

Ok(commands)
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use futures::StreamExt;
use rdkafka::{
consumer::{Consumer, StreamConsumer},
producer::{FutureProducer, FutureRecord},
ClientConfig, Message,
};
use testcontainers::runners::AsyncRunner;

use crate::kafka::apache;

#[tokio::test]
async fn produce_and_consume_messages_graalvm(
) -> Result<(), Box<dyn std::error::Error + 'static>> {
let _ = pretty_env_logger::try_init();
let kafka_node = apache::Kafka::default().start().await?;

let bootstrap_servers = format!(
"127.0.0.1:{}",
kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
);

let producer = ClientConfig::new()
.set("bootstrap.servers", &bootstrap_servers)
.set("message.timeout.ms", "5000")
.create::<FutureProducer>()
.expect("Failed to create Kafka FutureProducer");

let consumer = ClientConfig::new()
.set("group.id", "testcontainer-rs")
.set("bootstrap.servers", &bootstrap_servers)
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.create::<StreamConsumer>()
.expect("Failed to create Kafka StreamConsumer");

let topic = "test-topic";

let number_of_messages_to_produce = 5_usize;
let expected: Vec<String> = (0..number_of_messages_to_produce)
.map(|i| format!("Message {i}"))
.collect();

for (i, message) in expected.iter().enumerate() {
producer
.send(
FutureRecord::to(topic)
.payload(message)
.key(&format!("Key {i}")),
Duration::from_secs(0),
)
.await
.unwrap();
}

consumer
.subscribe(&[topic])
.expect("Failed to subscribe to a topic");

let mut message_stream = consumer.stream();
for produced in expected {
let borrowed_message =
tokio::time::timeout(Duration::from_secs(10), message_stream.next())
.await
.unwrap()
.unwrap();

assert_eq!(
produced,
borrowed_message
.unwrap()
.payload_view::<str>()
.unwrap()
.unwrap()
);
}

Ok(())
}

#[tokio::test]
async fn produce_and_consume_messages_jvm() -> Result<(), Box<dyn std::error::Error + 'static>>
{
let _ = pretty_env_logger::try_init();
let kafka_node = apache::Kafka::default().with_jvm_image().start().await?;

let bootstrap_servers = format!(
"127.0.0.1:{}",
kafka_node.get_host_port_ipv4(apache::KAFKA_PORT).await?
);

let producer = ClientConfig::new()
.set("bootstrap.servers", &bootstrap_servers)
.set("message.timeout.ms", "5000")
.create::<FutureProducer>()
.expect("Failed to create Kafka FutureProducer");

let consumer = ClientConfig::new()
.set("group.id", "testcontainer-rs")
.set("bootstrap.servers", &bootstrap_servers)
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.create::<StreamConsumer>()
.expect("Failed to create Kafka StreamConsumer");

let topic = "test-topic";

let number_of_messages_to_produce = 5_usize;
let expected: Vec<String> = (0..number_of_messages_to_produce)
.map(|i| format!("Message {i}"))
.collect();

for (i, message) in expected.iter().enumerate() {
producer
.send(
FutureRecord::to(topic)
.payload(message)
.key(&format!("Key {i}")),
Duration::from_secs(0),
)
.await
.unwrap();
}

consumer
.subscribe(&[topic])
.expect("Failed to subscribe to a topic");

let mut message_stream = consumer.stream();
for produced in expected {
let borrowed_message =
tokio::time::timeout(Duration::from_secs(10), message_stream.next())
.await
.unwrap()
.unwrap();

assert_eq!(
produced,
borrowed_message
.unwrap()
.payload_view::<str>()
.unwrap()
.unwrap()
);
}

Ok(())
}
}
Loading

0 comments on commit 1d2d0af

Please sign in to comment.