Skip to content

Latest commit

 

History

History
 
 

connect-mqtt-sink

MQTT Sink connector

asciinema

Objective

Quickly test MQTT Sink connector.

How to run

Simply run:

$ ./mqtt-sink.sh

Details of what the script is doing

Note: The ./password file was created with (myuser/mypassword) and command:

$ mosquitto_passwd -c password myuser

Sending messages to topic sink-messages

$ docker exec -i broker kafka-console-producer --broker-list broker:9092 --topic sink-messages << EOF
This is my message
EOF

Creating MQTT Sink connector

$ curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
               "connector.class": "io.confluent.connect.mqtt.MqttSinkConnector",
                    "tasks.max": "1",
                    "mqtt.server.uri": "tcp://mosquitto:1883",
                    "topics":"sink-messages",
                    "mqtt.qos": "2",
                    "mqtt.username": "myuser",
                    "mqtt.password": "mypassword",
                    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
                    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
                    "confluent.license": "",
                    "confluent.topic.bootstrap.servers": "broker:9092",
                    "confluent.topic.replication.factor": "1"
          }' \
     http://localhost:8083/connectors/sink-mqtt/config | jq .

Verify we have received messages in MQTT sink-messages topic

docker exec mosquitto sh -c 'mosquitto_sub -h localhost -p 1883 -u "myuser" -P "mypassword" -t "sink-messages" -C 1'

N.B: Control Center is reachable at http://127.0.0.1:9021