This module provides a bridge between an MQTT (Message Queuing Telemetry Transport) server and a Kafka broker, allowing messages received on the MQTT server to be published to the Kafka broker.
Valid characters for Kafka topics are the ASCII alphanumerics, .
, _
, and -
and it is better not to mix .
and _
to avoid metric namespace collisions
Valid characters for MQTT topics are similar to above with the exception that /
character is user to denote hierarchy.
Since /
is not allowed for Kafka topics, we replace all occurrences of /
in the MQTT topic name with _
see kafka_handler.py.convert_MQTT_KAFKA_topic()
IMPORTANT NOTE: The Kafka broker must be configured to allow producer clients to create topics in order to ease the operation of converting new MQTT topics to Kafka
-
Integration Options to KAFKA There are two possibles ways to integrate the current setup
-
Bridge MQTT to KAFKA: Which is the approach I have chosen primarily because it provides flexibility and decoupling between all the service. Also most enterprise Kafka brokers have an inbuilt KAFKA plugin to ease the integration of MQTT to KAFKA
-
Change Detect Capture via Debezium from the Graph database: This approach showed a lot of promise (Refer this article). I chose against this primarily because this was coupling the graph database with Kafka as well as adding additional complexity to the graph database
-
-
Deployment of KAFKA Broker The primary location for deploying the KAFKA broker would be in the Enterprise hosting or the Cloud next to the Enterprise/Cloud based MQTT broker as opposed to deploying KAFKA on the manufacturing floor @Kai Waehner has a brilliant 5 part Blog series on extracting the best by integrating MQTT and KAFKA Just like the hosted enterprise versions of MQTT provided by various vendors, going with a hosted solution for KAFKA like Confluent Cloud, AWS MSK, Aiven, cloudkarafka etc. If you want to deploy and manage Kafka, I recommend also checking out Strimzi as a way to deploy and manage Kafka on Kubernetes. I would recommend using Kafka in Kraft mode which significantly improved the experience of deploying and managing Kafka
-
Choice of KAFKA Client library Of the many Kafka client libraries for python, I selected confluent-kafka primarily for its slightly better performance and similarity to other libraries (owing to being a wrapper over the C library librdkafka)
This application has two configuration file.
-
settings.yaml: Contain the key configurations need to connect with MQTT brokers as well as the Kafka brokers
key sub key description default value mqtt host* Hostname of the mqtt broker instant. Mandatory configuration None mqtt port Port of the mqtt broker (int) 1883 mqtt topics Array of topics to be subscribed to. Must be in the names space of SpB i.e. spBv1.0/# ["spBv1.0/#"] mqtt qos QOS for the subscription. Valid values are 0,1,2 1 mqtt keep*alive Maximum time interval in seconds between two control packet published by the client (int) _60* mqtt reconnect*on_failure Makes the client handle reconnection(s). Recommend keeping this True (True,False) _True* mqtt version The MQTT version to be used for connecting to the broker. Valid values are : 5 (for MQTTv5), 4 (for MQTTv311) , 3(for MQTTv31) 5 mqtt clean*session Boolean value to be specified only if MQTT Version is not 5 _None* mqtt transport Valid values are "websockets", "tcp" "tcp" mqtt ignored*attributes Map of topic & list of attributes which are to be ignored from persistence. supports wild cards for topics and nested via . notation for the attributes
e.g.
{
'topic1' : ["attr1", "attr2", "attr2.subAttr1" ],
'topic2/+' : ["A", "A.B.C"],
'topic3/#' : ["x", "Y"]
}None mqtt timestamp_attribute the attribute name which should contain the timestamp of the message's publishing *"timestamp"_ kafka config* Mandatory Dict. see Kafka client configuration. All non security configurations _None_ dynaconf_merge* Mandatory param. Always keep value as true -
.secret.yaml : Contains the username and passwords to connect This file is not checked into the repository for security purposes. However there is a template file provided
.secrets_template.yaml
which should be edited and renamed to.secrets.yaml
key sub key sub key description default value mqtt username The user id needed to authenticate with the MQTT broker None mqtt password The password needed to authenticate with the MQTT broker None mqtt tls Provide a map of attributes needed for a TLS connection to the MQTT broker. See below attributes None mqtt tls ca*certs fully qualified path to the ca cert file. Mandatory for an SSL connection _None* mqtt tls certfile fully qualified path to the cert file None mqtt tls keyfile fully qualified path to the keyfile for the cert None mqtt tls cert*reqs Boolean. If note provided then ssl.CERT_NONE is used. if True the ssl.CERT_REQUIRED is used. else ssl.CERT_OPTIONAL is used _None* mqtt tls ciphers Specify which encryption ciphers are allowed for this connection None mqtt tls keyfile*password Password used to encrypt your certfile and keyfile _None* mqtt tls insecure*cert Boolean. Skips hostname checking required for self signed certificates. _True* kafka config Dict. see kafka client configuration. Only the security related settings None dynaconf_merge* | | | Mandatory param. Always keep value as true |
This sub module can be independently setup as a dev environment in the folder 06_uns_kafka
The following command creates a dev instance of a Kafka broker. To be used only for development purposes.
docker run \
--name uns_kafka \
--env KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://localhost:9092" \
--env ALLOW_PLAINTEXT_LISTENER="yes" \
--env KAFKA_CFG_NODE_ID="0" \
--env KAFKA_CFG_PROCESS_ROLES="controller,broker" \
--env KAFKA_CFG_LISTENERS="PLAINTEXT://:9092,CONTROLLER://:9093" \
--env KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP="CONTROLLER:PLAINTEXT, PLAINTEXT:PLAINTEXT" \
--env KAFKA_CFG_CONTROLLER_QUORUM_VOTERS="0@localhost:9093" \
--env KAFKA_CFG_CONTROLLER_LISTENER_NAMES="CONTROLLER" \
-p 9092:9092 \
-d \
bitnami/kafka:latest
Setting up a production grade Kafka cluster is detailed on the confluent site Running Kafka in Production
This has been tested on Unix(bash), Windows(powershell) and Mac(zsh)
python -m pip install --upgrade pip
pip install poetry
# Ensure that the poetry shell is activated
poetry shell
python -m pip install --upgrade pip poetry
poetry install
Setting up VSCode
While importing the folder into VSCode remember to do the following steps the first time
Open a terminal in VSCode
Activate the poetry shell
poetry shell python -m pip install --upgrade pip poetry poetry installSelect the correct python interpreter in VSCode (should automatically detect the poetry virtual environment)
This function is executed by the following command with the current folder as 06_uns_kafka
Ensure that the configuration files are correctly updated to your MQTT broker and database instance
# Ensure that the poetry shell is activated
poetry shell
poetry install
python ./src/uns_kafka/uns_kafka_listener.py
The set of test for this module is executed by
#run all tests excluding integration tests
poetry run pytest -m "not integrationtest" test/
# runs all tests
poetry run pytest test/
The docker container image for this module are built and store in the Dockerize module published to Github Container Registry
The way to run the container is
# docker pull ghcr.io/mkashwin/unifiednamespace/uns/kafka_mapper:<tag>
# e.g.
docker pull ghcr.io/mkashwin/unifiednamespace/uns/kafka_mapper:latest
# docker run --name <container name> -d s-v <full path to conf>/:/app/conf uns/kafka_mapper:<tag>
docker run --name uns_mqtt_2_kafka -d -v $PWD/conf:/app/conf ghcr.io/mkashwin/unifiednamespace/uns/kafka_mapper:latest
Note: Remember to update the following before executing
-
<container name> (optional): Identifier for the container so you can work with the same container instance using
docker start <container name> docker stop <container name>
-
<full path to conf> (Mandatory): Volume mounted to the container containing the configurations. See Key Configurations to provide. Give the complete path and not relative path