This repository contains HTTP and Apache Kafka® clients used in Strimzi systemtests:
- Kafka Producer, which periodically produces messages into a topic
- Kafka Streams application which reads messages from a topic transforms them (reverses the message payload) and sends them to another topic
- Kafka Consumer, which is consuming messages from a topic
- Kafka Admin, which allows user to alter/create/remove topics via Admin API
- HTTP Producer, which produces messages to a topic using host and port
- HTTP Consumer, which consumes messages from a topic and endpoint
All clients are used as K8s Job
- example files are present in examples folder.
Logging configuration can be found in the log4j2.properties
file in each module separately.
The pre-built images are available on our Quay.io. But if you want to do any modifications to the clients, you will need to build your own versions.
To build these examples you need some basic requirements.
Make sure you have make
, docker
, JDK 11
and mvn
installed.
By default, the Docker organization to which images are pushed is the one defined by the USER
environment variable which is assigned to the DOCKER_ORG
one.
One can change the organization by exporting a different value for the DOCKER_ORG
, and it can also be the internal registry of an OpenShift running cluster.
The command for building images with the latest supported Kafka version is:
make all
The essential requirement to run these clients is a Kubernetes cluster with Strimzi and Apache Kafka cluster. How to deploy Apache Kafka using Strimzi can be found on the Strimzi website.
After successfully building images (which will cause the images to be pushed to the specified Docker repository) you are ready to deploy the producer and consumer containers along with Kafka and Zookeper.
You can deploy clients by using some examples inside the examples folder
This will create Kubernetes Jobs
with the example image.
Example command for deploying job:
kubectl apply -f examples/kafka/kafka-consumer.yaml -n myproject
If you built your own version of these clients, remember to update the image
field with the path where the image was pushed during the build, and it's available (i.e. <my-docker-org>/test-client-http-consumer:latest
).
Below are listed and described environmental variables.
Kafka Producer
BOOTSTRAP_SERVERS
- comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair ishost:port
, e.g.my-cluster-kafka-bootstrap:9092
TOPIC
- the topic producer will send toDELAY_MS
- the delay, in ms, between messagesMESSAGE_COUNT
- the number of messages the producer should sendMESSAGE_KEY
- the message key used by the producer for all messages sent.MESSAGE
- message which the producer should sendMESSAGE_TEMPLATE
- template from data-generator for messages. It has higher priority thanMESSAGE
evn var.CA_CRT
- the certificate of the CA which signed the brokers' TLS certificates, for adding to the client's trust storeUSER_CRT
- the user's certificateUSER_KEY
- the user's private keyPRODUCER_ACKS
- acknowledgement levelHEADERS
- custom headers list separated by commas ofkey1=value1, key2=value2
ADDITIONAL_CONFIG
- additional configuration for a producer application. Notice, that you can also override any previously set variable by setting this. The form iskey=value
records separated by new line character
Kafka Consumer
BOOTSTRAP_SERVERS
- comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair ishost:port
, e.g.my-cluster-kafka-bootstrap:9092
TOPIC
- name of topic which consumer subscribesGROUP_ID
- specifies the consumer group id for the consumerMESSAGE_COUNT
- the number of messages the consumer should receiveCA_CRT
- the certificate of the CA which signed the brokers' TLS certificates, for adding to the client's trust storeUSER_CRT
- the user's certificateUSER_KEY
- the user's private keyOUTPUT_FORMAT
- the output format for received messages:plain
(default value) orjson
ADDITIONAL_CONFIG
- additional configuration for a consumer application. Notice, that you can also override any previously set variable by setting this. The form iskey=value
records separated by new line character
Kafka Streams
BOOTSTRAP_SERVERS
- comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair ishost:port
, e.g.my-cluster-kafka-bootstrap:9092
APPLICATION_ID
- The Kafka Streams application IDSOURCE_TOPIC
- name of topic which will be used as the source of messagesTARGET_TOPIC
- name of topic where the transformed images are sentCOMMIT_INTERVAL_MS
- the interval for the Kafka Streams consumer part committing the offsetsCA_CRT
- the certificate of the CA which signed the brokers' TLS certificates, for adding to the client's trust storeUSER_CRT
- the user's certificateUSER_KEY
- the user's private keyADDITIONAL_CONFIG
- additional configuration for a streams application. Notice, that you can also override any previously set variable by setting this. The form iskey=value
records separated by new line character.
Kafka Admin
BOOTSTRAP_SERVERS
- comma-separated host and port pairs that is a list of Kafka broker addresses. The form of pair ishost:port
, e.g.my-cluster-kafka-bootstrap:9092
TOPIC
- topic name (or prefix if topic_count > 1) to be createdPARTITIONS
- number of partitions per topicREPLICATION_FACTOR
-replication.factor
to set for topicTOPICS_COUNT
- (def. 1) - number of topics to createTOPIC_OPERATION
-create
,remove
|delete
,list
,help
TOPIC_OFFSET
- start numbering of batch topics creation with this given offsetCA_CRT
- the certificate of the CA which signed the brokers' TLS certificates, for adding to the client's trust storeUSER_CRT
- the user's certificateUSER_KEY
- the user's private keyADDITIONAL_CONFIG
- additional configuration for an admin application. Notice, that you can also override any previously set variable by setting this. The form iskey=value
records separated by new line character.
For each client, you can also configure OAuth:
OAUTH_CLIENT_ID
- id of the OAuth clientOAUTH_CLIENT_SECRET
- OAuth client's secret nameOAUTH_ACCESS_TOKEN
- OAuth access tokenOAUTH_REFRESH_TOKEN
- OAuth refresh tokenOAUTH_TOKEN_ENDPOINT_URI
- URI (uniform resource identifier) endpoint, where the client connects and performs resolution to retrieve access token
HTTP Producer
HOSTNAME
- hostname of servicePORT
- port on which is service exposedTOPIC
- the topic producer will send toDELAY_MS
- the delay, in ms, between messagesMESSAGE_COUNT
- the number of messages the producer should sendMESSAGE
- message which the producer should sendMESSAGE_TEMPLATE
- template from data-generator for messages. It has higher priority thanMESSAGE
evn var.MESSAGE_TYPE
- type of message that will be used in records headers. Available are json and text.
HTTP Consumer
HOSTNAME
- hostname of servicePORT
- port on which is service exposedTOPIC
- topic from which should consumer receive messagesCLIENT_ID
- id of the client which consumer should useGROUP_ID
- id of the group which consumer should joinPOLL_INTERVAL
- interval, in ms, between pollsPOLL_TIMEOUT
- timeout, in ms, of one pollMESSAGE_COUNT
- the number of messages consumer should receiveMESSAGE_TYPE
- type of message that will be used in records headers. Available are json and text.
See README.md