SonarCloud results:
🛈 Please note that this is an open source project which is officially supported by Exasol. For any question, you can contact our support team or open a Github issue.
Exasol database dialect example setup for Kafka Confluent JDBC Connector.
Please bear in mind that this only works with Kafka Connect JDBC version 5.0+
If you already have an running Confluent Kafka Connect cluster, you need setup Exasol source or sink configuration (or both). You can find example configurations for exasol-source and exasol-sink. Please upload these to Kafka Connect connectors, for example,
curl -X POST \
-H "Content-Type: application/json" \
--data @exasol-source.json kafka.connect.host:8083/connectors
Additionally, you need to upload the Exasol JDBC
jars to the connect plugin path. The plugin paths
are possibly /usr/share/java
or /etc/kafka-connect/jars
. However, please
check that these paths are on Kafka classpath.
You can find more information on Confluent documentation pages. Some relevant documentations are listed below.
- Kafka Connectors
- Kafka Connect JDBC Connector
- Configuring Connectors
- Manually Installing Community Connectors
For testing we are going to use docker and docker-compose. Please set them up accordingly on your local machine. For running Exasol docker-db you need root privileges.
Additionally, if you are using non Linux machine, please obtain the ip address for docker or docker-machine. For example, in MacOS, with the following command:
docker-machine ip
For the rest of documentation, when we refer to localhost
, substitute it with
ip address resulted from above command.
We need to open several terminals for dockerized testing.
- First clone this repository and start all the services:
git clone https://github.com/EXASOL/kafka-connect-jdbc-exasol.git
cd kafka-connect-jdbc-exasol
# If you're running docker in a virtual environment you might
# need to run the following command before docker-compose up:
# export COMPOSE_TLS_VERSION=TLSv1_2
docker-compose up
- Now we should create an exasol sample schema and table. This example creates a
country
table insidecountry_schema
and inserts couple of records into it. This step should happen before Kafka connector configurations setup because Kafka immediately starts to look for the Exasol tables.
docker exec -it exasol-db exaplus -c n11:8888 -u sys -P exasol -f /test/country.sql
- Once the schema and table is created, open another terminal and upload connecter configuration file to kafka connect:
# Create and add a new Kafka Connect Source
curl -X POST \
-H "Content-Type: application/json" \
--data @exasol-source.json localhost:8083/connectors
# You can see all available connectors with the command:
curl localhost:8083/connectors/
# Similarly, you can see the status of a connector with the command:
curl localhost:8083/connectors/exasol-source/status
- Lastly, on terminal four, let us consume some data from Kafka. For the purpose of this example we are going to use kafka console consumer:
docker exec -it kafka02 /bin/bash
# List available Kafka topics, we should see the 'EXASOL_COUNTRY' listed.
kafka-topics --list --zookeeper zookeeper.internal:2181
# Start kafka console consumer
kafka-console-consumer \
--bootstrap-server kafka01.internal:9092 \
--from-beginning \
--topic EXASOL_COUNTRY
-
You should see two records inserted from other terminal. Similarly, if you insert new records into
country
table in Exasol, they should be listed on kafka consumer console. -
In order to see the console results in a structured way, you can consume them using Avro console consumer from
schema-registry
container:
docker exec -it schema-registry /bin/bash
kafka-avro-console-consumer \
--bootstrap-server kafka01.internal:9092 \
--from-beginning \
--topic EXASOL_COUNTRY
- In first terminal, upload Kafka Connect Exasol Sink configurations:
curl -X POST \
-H "Content-Type: application/json" \
--data @exasol-sink.json \
localhost:8083/connectors
## Check that sink connector is running
curl localhost:8083/connectors/exasol-sink/status
- In second terminal open the kafka avro producer and produce some records for
an example topic
country_population
:
docker exec -it schema-registry /bin/bash
kafka-avro-console-producer \
--broker-list kafka01.internal:9092 \
--topic COUNTRY_POPULATION \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"country_name","type":"string"},{"name":"population", "type": "long"}]}'
- Type some records in JSON format:
{"country_name": "France", "population": 67}
{"country_name": "Croatia", "population": 4}
- In another terminal, ensure that the records are available in Exasol table:
docker exec -it exasol-db bash -c 'exaplus -c n11:8888 -u sys -P exasol -sql "SELECT * FROM country_schema.country_population;"'
For this example setup we depend on several jar files:
- Exasol JDBC Driver
- Exasol Kafka Connect JDBC Jar
- You can create Kafka Connect Exasol JDBC jar via
mvn clean package
that will create jar file intarget/
. Then copy it intokafka-connect-image/jars/
.
- You can create Kafka Connect Exasol JDBC jar via
- Kafka Connect JDBC Connector (5.0+ version)
- You can find installation guide here at docs/install-kafka.
Additionally, we are using docker-compose based Exasol and Kafka Connect services. The Kafka Connect is configured for distributed mode.
Service Name | Versions | Description |
---|---|---|
exasol-db |
dockerhub/exasol/docker-db:6.0.10-d1 | An Exasol docker db. Please note that we use stand-alone cluster mode. |
zookeeper |
dockerhub/confluentinc/cp-zookeeper:4.1.1 | A single node zookeeper instance. |
kafka |
dockerhub/confluentinc/cp-kafka:4.1.1 | A kafka instance. We have three kafka node setup. |
schema-registry |
dockerhub/confluentinc/cp-schema-registry:4.1.1 | A schema-registry instance. |
kafka-connect |
kafka-connect-image/Dockerfile | Custom configured kafka-connect instance. |
There are several tips and tricks to consider when setting up the Kafka Exasol connector.
-
The timestamp and incrementing column names should be in upper case, for example,
"timestamp.column.name": "UPDATED_AT"
. This is due to fact that Exasol makes all fields upper case and Kafka connector is case sensitive. -
The
tasks.max
should be more than the number of tables in production systems. That is in jdbc connectors each table is sourced per partition then handled by single task. -
The
incrementing
ortimestamp
column names in Kafka Connect configuration, should have aNOT NULL
constraint when creating a table definition.
The batch mode together with upsert is not supported at the moment. We transform
the Kafka upserts into Exasol specific MERGE
statements that does not support
batches.
You can read more about it at issue #5.