Use docker-compose.yml
for a valid multi-node Kafka Connect cluster.
Three connectors, each with a misconfigured (non-resolvable) CONNECT_REST_ADVERTISED_HOST_NAME
worker 2:
Joined group at generation 1 and got assignment: Assignment{error=0, leader='connect-1-8c9ca437-de73-4faa-9df8-adcc0feca3fd', leaderUrl='http://foobar2:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
worker 3:
Joined group at generation 2 and got assignment: Assignment{error=0, leader='connect-1-8c9ca437-de73-4faa-9df8-adcc0feca3fd', leaderUrl='http://foobar2:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
worker 1:
Joined group at generation 3 and got assignment: Assignment{error=0, leader='connect-1-8c9ca437-de73-4faa-9df8-adcc0feca3fd', leaderUrl='http://foobar2:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
No connectors defined yet
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \ jq '.' {}
Config topic
$ kafkacat -b localhost:9092 -t _kafka-connect-groupA-configs -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C % Reached end of topic _kafka-connect-groupA-configs [0] at offset 0
Send the config to worker 1:
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \ -d '{ "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "kafka.topic": "item_details_01", "max.interval":250, "quickstart": "ratings", "tasks.max": 6 }'
{"error_code":500,"message":"IO Error trying to forward REST request: java.net.UnknownHostException: foobar2: Name or service not known"}⏎
Stop worker 2
worker 1:
Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Successfully joined group with generation 4 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Joined group at generation 4 and got assignment: Assignment{error=0, leader='connect-1-1bf43113-2bc6-4ebb-aa50-82827da0d1d3', leaderUrl='http://foobar3:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
worker 3:
Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Successfully joined group with generation 4 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Joined group at generation 4 and got assignment: Assignment{error=0, leader='connect-1-1bf43113-2bc6-4ebb-aa50-82827da0d1d3', leaderUrl='http://foobar3:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
Send config to worker 1:
$ curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \ -d '{ "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "kafka.topic": "item_details_01", "max.interval":250, "quickstart": "ratings", "tasks.max": 6 }' {"error_code":500,"message":"IO Error trying to forward REST request: java.net.UnknownHostException: foobar3: Name or service not known"}⏎
Send config to worker 3:
$ curl -s -X PUT -H "Content-Type:application/json" http://localhost:28083/connectors/source-datagen-01/config \ -d '{ "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "kafka.topic": "item_details_01", "max.interval":250, "quickstart": "ratings", "tasks.max": 6 }'
Successfully created. Configs topic updates:
$ kafkacat -b localhost:9092 -t _kafka-connect-groupA-configs -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C % Reached end of topic _kafka-connect-groupA-configs [0] at offset 0 Topic _kafka-connect-groupA-configs[0], offset: 0, Headers: , key: connector-source-datagen-01, payload: 274 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","key.converter":"org.apache.kafka.connect.storage.StringConverter","kafka.topic":"item_details_01","max.interval":"250","quickstart":"ratings","tasks.max":"6","name":"source-datagen-01"}} % Reached end of topic _kafka-connect-groupA-configs [0] at offset 1 Topic _kafka-connect-groupA-configs[0], offset: 1, Headers: , key: task-source-datagen-01-0, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}} Topic _kafka-connect-groupA-configs[0], offset: 2, Headers: , key: task-source-datagen-01-1, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}} Topic _kafka-connect-groupA-configs[0], offset: 3, Headers: , key: task-source-datagen-01-2, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}} Topic _kafka-connect-groupA-configs[0], offset: 4, Headers: , key: task-source-datagen-01-3, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}} Topic _kafka-connect-groupA-configs[0], offset: 5, Headers: , key: task-source-datagen-01-4, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}} Topic _kafka-connect-groupA-configs[0], offset: 6, Headers: , key: task-source-datagen-01-5, payload: 336 bytes: {"properties":{"connector.class":"io.confluent.kafka.connect.datagen.DatagenConnector","quickstart":"ratings","task.class":"io.confluent.kafka.connect.datagen.DatagenTask","tasks.max":"6","name":"source-datagen-01","kafka.topic":"item_details_01","max.interval":"250","key.converter":"org.apache.kafka.connect.storage.StringConverter"}} Topic _kafka-connect-groupA-configs[0], offset: 7, Headers: , key: commit-source-datagen-01, payload: 11 bytes: {"tasks":6} % Reached end of topic _kafka-connect-groupA-configs [0] at offset 8
Status topic updates:
$ kafkacat -b localhost:9092 -t _kafka-connect-groupA-status -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C % Reached end of topic _kafka-connect-groupA-status [0] at offset 0 % Reached end of topic _kafka-connect-groupA-status [1] at offset 0 % Reached end of topic _kafka-connect-groupA-status [2] at offset 0 % Reached end of topic _kafka-connect-groupA-status [3] at offset 0 % Reached end of topic _kafka-connect-groupA-status [4] at offset 0 Topic _kafka-connect-groupA-status[3], offset: 0, Headers: , key: status-connector-source-datagen-01, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar3:8083","generation":5} % Reached end of topic _kafka-connect-groupA-status [3] at offset 1 Topic _kafka-connect-groupA-status[0], offset: 0, Headers: , key: status-task-source-datagen-01-4, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar3:8083","generation":6} Topic _kafka-connect-groupA-status[1], offset: 0, Headers: , key: status-task-source-datagen-01-2, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar3:8083","generation":6} % Reached end of topic _kafka-connect-groupA-status [0] at offset 1 Topic _kafka-connect-groupA-status[0], offset: 1, Headers: , key: status-task-source-datagen-01-0, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar3:8083","generation":6} % Reached end of topic _kafka-connect-groupA-status [1] at offset 1 Topic _kafka-connect-groupA-status[4], offset: 0, Headers: , key: status-task-source-datagen-01-5, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":6} % Reached end of topic _kafka-connect-groupA-status [0] at offset 2 % Reached end of topic _kafka-connect-groupA-status [4] at offset 1 Topic _kafka-connect-groupA-status[0], offset: 2, Headers: , key: status-task-source-datagen-01-3, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":6} Topic _kafka-connect-groupA-status[1], offset: 1, Headers: , key: status-task-source-datagen-01-1, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":6} % Reached end of topic _kafka-connect-groupA-status [0] at offset 3 % Reached end of topic _kafka-connect-groupA-status [1] at offset 2
Three tasks started on each worker
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \ jq '.' { "source-datagen-01": { "info": { "name": "source-datagen-01", "config": { "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "quickstart": "ratings", "tasks.max": "6", "name": "source-datagen-01", "kafka.topic": "item_details_01", "max.interval": "250", "key.converter": "org.apache.kafka.connect.storage.StringConverter" }, "tasks": [ { "connector": "source-datagen-01", "task": 0 }, { "connector": "source-datagen-01", "task": 1 }, { "connector": "source-datagen-01", "task": 2 }, { "connector": "source-datagen-01", "task": 3 }, { "connector": "source-datagen-01", "task": 4 }, { "connector": "source-datagen-01", "task": 5 } ], "type": "source" }, "status": { "name": "source-datagen-01", "connector": { "state": "RUNNING", "worker_id": "foobar3:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "foobar3:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "foobar1:8083" }, { "id": 2, "state": "RUNNING", "worker_id": "foobar3:8083" }, { "id": 3, "state": "RUNNING", "worker_id": "foobar1:8083" }, { "id": 4, "state": "RUNNING", "worker_id": "foobar3:8083" }, { "id": 5, "state": "RUNNING", "worker_id": "foobar1:8083" } ], "type": "source" } } }
Stop worker3
Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Successfully joined group with generation 7 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Joined group at generation 7 and got assignment: Assignment{error=0, leader='connect-1-d16e56f8-7107-4d23-9f10-f5c70ab089ad', leaderUrl='http://foobar1:8083/', offset=8, connectorIds=[source-datagen-01], taskIds=[source-datagen-01-0, source-datagen-01-2, source-datagen-01-4, source-datagen-01-1, source-datagen-01-3, source-datagen-01-5], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
Tasks reassigned to worker 1
Status topic:
Topic _kafka-connect-groupA-status[3], offset: 1, Headers: , key: status-connector-source-datagen-01, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":7} % Reached end of topic _kafka-connect-groupA-status [3] at offset 2 Topic _kafka-connect-groupA-status[0], offset: 3, Headers: , key: status-task-source-datagen-01-4, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":7} Topic _kafka-connect-groupA-status[0], offset: 4, Headers: , key: status-task-source-datagen-01-0, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":7} % Reached end of topic _kafka-connect-groupA-status [0] at offset 5 Topic _kafka-connect-groupA-status[1], offset: 2, Headers: , key: status-task-source-datagen-01-2, payload: 74 bytes: {"state":"RUNNING","trace":null,"worker_id":"foobar1:8083","generation":7}
Connector status
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \ jq '.'
[…] "status": { "name": "source-datagen-01", "connector": { "state": "RUNNING", "worker_id": "foobar1:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "foobar1:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "foobar1:8083" }, { "id": 2, "state": "RUNNING", "worker_id": "foobar1:8083" }, { "id": 3, "state": "RUNNING", "worker_id": "foobar1:8083" }, { "id": 4, "state": "RUNNING", "worker_id": "foobar1:8083" }, { "id": 5, "state": "RUNNING", "worker_id": "foobar1:8083" } ], "type": "source" […]
tl;dr If the rest.advertised.host.name
cannot be resolved by the other workers then you will have problems making config changes. It doesn’t seem to impact the execution of connectors though, since this is coordinated through the Kafka topic.
localhost
in the context of a worker will be the local machine; in the case of Docker this means each individual container, nothing to do with the host machine on which it runs
Bounced the workers a few times to pick up correct config. Note new Kafka topics from previous scenario.
Worker 1
Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Successfully joined group with generation 16 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Joined group at generation 16 and got assignment: Assignment{error=0, leader='connect-1-410154a3-a7cc-4c60-adff-79ea9938b431', leaderUrl='http://localhost:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
Worker 2
Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Successfully joined group with generation 16 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Joined group at generation 16 and got assignment: Assignment{error=0, leader='connect-1-410154a3-a7cc-4c60-adff-79ea9938b431', leaderUrl='http://localhost:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
Worker 3
Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Successfully joined group with generation 16 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Joined group at generation 16 and got assignment: Assignment{error=0, leader='connect-1-410154a3-a7cc-4c60-adff-79ea9938b431', leaderUrl='http://localhost:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
Note that each one has the leaderUrl
of localhost:8083
.
No consumer groups
$ docker exec -it kafka kafka-consumer-groups \ --bootstrap-server kafka:29092 \ --list [~/g/d/connect-cluster] rmoff@asgard03.moffatt.me (connect-cluster-nov19|…) $
Nothing in any of the KC topics.
$ kafkacat -b localhost:9092 -t _kafka-connect-groupB-configs -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C % Reached end of topic _kafka-connect-groupB-configs [0] at offset 0
$ kafkacat -b localhost:9092 -t _kafka-connect-groupB-offsets -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C % Reached end of topic _kafka-connect-groupB-offsets [0] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [1] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [2] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [3] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [4] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [5] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [6] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [7] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [8] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [9] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [10] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [11] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [12] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [13] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [14] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [15] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [16] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [17] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [18] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [19] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [20] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [21] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [22] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [23] at offset 0 % Reached end of topic _kafka-connect-groupB-offsets [24] at offset 0
$ kafkacat -b localhost:9092 -t _kafka-connect-groupB-status -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C % Reached end of topic _kafka-connect-groupB-status [0] at offset 0 % Reached end of topic _kafka-connect-groupB-status [1] at offset 0 % Reached end of topic _kafka-connect-groupB-status [2] at offset 0 % Reached end of topic _kafka-connect-groupB-status [3] at offset 0 % Reached end of topic _kafka-connect-groupB-status [4] at offset 0
Consumer offsets topic
$ kafkacat -b localhost:9092 -t __consumer_offsets -o beginning -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n' -u -C
Most recent message
Topic __consumer_offsets / Partition 33 / Offset: 15 / Timestamp: 1574417598705 Headers: Key (24 bytes): kafka-connect-groupB Payload (855 bytes): connect compatible.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431n��.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431�� connect-1/172.28.0.5�`'�http://localhost:8083/��������`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/��������`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/��������.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637�� connect-1/172.28.0.7�`'&http://localhost:8083/������������`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/��������.connect-1-9a6ad1d8-cd5e-4a04-9dc6-fad03fb005af�� connect-1/172.28.0.6�`'&http://localhost:8083/������������`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/��������
Docker network IP of each worker
$ docker exec kafka-connect-01 bash -c "ip addr show eth0" 201: eth0@if202: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default link/ether 02:42:ac:1c:00:07 brd ff:ff:ff:ff:ff:ff inet 172.28.0.7/16 brd 172.28.255.255 scope global eth0 valid_lft forever preferred_lft forever [~/g/d/connect-cluster] rmoff@asgard03.moffatt.me (connect-cluster-nov19|…) $ docker exec kafka-connect-02 bash -c "ip addr show eth0" 199: eth0@if200: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default link/ether 02:42:ac:1c:00:06 brd ff:ff:ff:ff:ff:ff inet 172.28.0.6/16 brd 172.28.255.255 scope global eth0 valid_lft forever preferred_lft forever [~/g/d/connect-cluster] rmoff@asgard03.moffatt.me (connect-cluster-nov19|…) $ docker exec kafka-connect-03 bash -c "ip addr show eth0" 197: eth0@if198: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default link/ether 02:42:ac:1c:00:05 brd ff:ff:ff:ff:ff:ff inet 172.28.0.5/16 brd 172.28.255.255 scope global eth0 valid_lft forever preferred_lft forever [~/g/d/connect-cluster] rmoff@asgard03.moffatt.me (connect-cluster-nov19|…)
Send the config to worker 1:
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \ -d '{ "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "kafka.topic": "item_details_01", "max.interval":250, "quickstart": "ratings", "tasks.max": 6 }'
{"error_code":409,"message":"Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}⏎
Worker 1 logs:
kafka-connect-01 | [2019-11-22 10:43:54,923] INFO AbstractConfig values: kafka-connect-01 | (org.apache.kafka.common.config.AbstractConfig) kafka-connect-01 | [2019-11-22 10:43:54,936] INFO AbstractConfig values: kafka-connect-01 | (org.apache.kafka.common.config.AbstractConfig) kafka-connect-01 | [2019-11-22 10:43:54,948] INFO AbstractConfig values: kafka-connect-01 | (org.apache.kafka.common.config.AbstractConfig)
Send it to Worker 2
curl -s -X PUT -H "Content-Type:application/json" http://localhost:18083/connectors/source-datagen-01/config \ -d '{ "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "kafka.topic": "item_details_01", "max.interval":250, "quickstart": "ratings", "tasks.max": 6 }'
{"error_code":409,"message":"Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}⏎
Worker 2 logs:
kafka-connect-02 | [2019-11-22 10:44:34,443] INFO AbstractConfig values: kafka-connect-02 | (org.apache.kafka.common.config.AbstractConfig) kafka-connect-02 | [2019-11-22 10:44:34,566] INFO AbstractConfig values: kafka-connect-02 | (org.apache.kafka.common.config.AbstractConfig) kafka-connect-02 | [2019-11-22 10:44:34,579] INFO AbstractConfig values: kafka-connect-02 | (org.apache.kafka.common.config.AbstractConfig)
Send it to Worker 3
curl -s -X PUT -H "Content-Type:application/json" http://localhost:28083/connectors/source-datagen-01/config \ -d '{ "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "kafka.topic": "item_details_01", "max.interval":250, "quickstart": "ratings", "tasks.max": 6 }'
Connector gets created and executed across all three workers:
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \ jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \ column -s : -t| sed 's/\"//g'| sort source | source-datagen-01 | RUNNING | RUNNING | RUNNING | RUNNING | RUNNING | RUNNING | RUNNING | io.confluent.kafka.connect.datagen.DatagenConnector
But in status topic and in REST API workers are only identified by their advertised host, which means you can’t track the task:
$ kafkacat -b localhost:9092 -t _kafka-connect-groupB-status -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C % Reached end of topic _kafka-connect-groupB-status [0] at offset 0 % Reached end of topic _kafka-connect-groupB-status [1] at offset 0 % Reached end of topic _kafka-connect-groupB-status [2] at offset 0 % Reached end of topic _kafka-connect-groupB-status [3] at offset 0 % Reached end of topic _kafka-connect-groupB-status [4] at offset 0 Topic _kafka-connect-groupB-status[3], offset: 0, Headers: , key: status-connector-source-datagen-01, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":17} % Reached end of topic _kafka-connect-groupB-status [3] at offset 1 Topic _kafka-connect-groupB-status[0], offset: 0, Headers: , key: status-task-source-datagen-01-3, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18} Topic _kafka-connect-groupB-status[0], offset: 1, Headers: , key: status-task-source-datagen-01-0, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18} Topic _kafka-connect-groupB-status[0], offset: 2, Headers: , key: status-task-source-datagen-01-4, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18} % Reached end of topic _kafka-connect-groupB-status [0] at offset 3 Topic _kafka-connect-groupB-status[1], offset: 0, Headers: , key: status-task-source-datagen-01-1, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18} Topic _kafka-connect-groupB-status[1], offset: 1, Headers: , key: status-task-source-datagen-01-2, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18} % Reached end of topic _kafka-connect-groupB-status [1] at offset 2 Topic _kafka-connect-groupB-status[4], offset: 0, Headers: , key: status-task-source-datagen-01-5, payload: 77 bytes: {"state":"RUNNING","trace":null,"worker_id":"localhost:8083","generation":18} % Reached end of topic _kafka-connect-groupB-status [4] at offset 1
Checking in each worker log shows that the tasks are running on each.
Offsets topic:
% Reached end of topic __consumer_offsets [33] at offset 17 Topic __consumer_offsets / Partition 33 / Offset: 17 / Timestamp: 1574419513301 Headers: Key (24 bytes): kafka-connect-groupB Payload (1167 bytes): connect compatible.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431n��?�.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431�� connect-1/172.28.0.5�`'�http://localhost:8083{.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/source-datagen-01����.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083source-datagen-01.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637�� connect-1/172.28.0.7�`'�http://localhost:8083`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083source-datagen-01.connect-1-9a6ad1d8-cd5e-4a04-9dc6-fad03fb005af�� connect-1/172.28.0.6�`'�http://localhost:8083`.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083/.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083source-datagen-01 --
Kill worker 3
% Reached end of topic __consumer_offsets [33] at offset 18 Topic __consumer_offsets / Partition 33 / Offset: 18 / Timestamp: 1574419726554 Headers: Key (24 bytes): kafka-connect-groupB Payload (847 bytes): connect compatible.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637n����.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637�� connect-1/172.28.0.7�`'�http://localhost:8083.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083source-datagen-01.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637http://localhost:8083source-datagen-01����.connect-1-9a6ad1d8-cd5e-4a04-9dc6-fad03fb005af�� connect-1/172.28.0.6�`'�http://localhost:8083.connect-1-410154a3-a7cc-4c60-adff-79ea9938b431http://localhost:8083source-datagen-01{.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637http://localhost:8083source-datagen-01 --
Kill worker 2
% Reached end of topic __consumer_offsets [33] at offset 19 Topic __consumer_offsets / Partition 33 / Offset: 19 / Timestamp: 1574419816661 Headers: Key (24 bytes): kafka-connect-groupB Payload (446 bytes): connect compatible.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637n����.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637�� connect-1/172.28.0.7�`'�http://localhost:8083�.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637http://localhost:8083source-datagen-01����`.connect-1-79e9c006-5f50-4ed4-9d51-52a1bdc73637http://localhost:8083�� -- % Reached end of topic __consumer_offsets [33] at offset 20
Tasks are reassigned each time. Offsets topic tracks the alive worker
tl;dr If the rest.advertised.host.name
is set to localhost
then if you send REST requests to a worker that is not the leader they will fail. Kafka Connect forwards these requests from non-leader workers to the leader worker, and if the advertised host name is localhost
then it is forwarding it to itself, which won’t work. It doesn’t seem to impact the execution of connectors though, since this is coordinated through the Kafka topic.
One worker in groupX, two workers in groupY. All three using the same config/offset/status topics.
Offsets topic
$ kafkacat -b localhost:9092 -t __consumer_offsets -o beginning -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\nHeaders: %h\nKey (%K bytes): %k\nPayload (%S bytes): %s\n--\n' -u -C
Topic __consumer_offsets / Partition 10 / Offset: 0 / Timestamp: 1574421190777 Headers: Key (24 bytes): kafka-connect-groupY Payload (563 bytes): connect compatible.connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743n���\.connect-1-edc46031-885a-4347-ad29-4da7c07f7fb9�� connect-1/172.29.0.7�`'-http://kafka-connect-02:8083/������������g.connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743http://kafka-connect-03:8083/��������.connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743�� connect-1/172.29.0.6�`'-http://kafka-connect-03:8083/������������g.connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743http://kafka-connect-03:8083/�������� -- Topic __consumer_offsets / Partition 11 / Offset: 0 / Timestamp: 1574421190665 Headers: Key (24 bytes): kafka-connect-groupX Payload (325 bytes): connect compatible.connect-1-dd81ede5-5b4b-49c7-98c6-817b3c91ec66n����.connect-1-dd81ede5-5b4b-49c7-98c6-817b3c91ec66�� connect-1/172.29.0.5�`'-http://kafka-connect-01:8083/������������g.connect-1-dd81ede5-5b4b-49c7-98c6-817b3c91ec66http://kafka-connect-01:8083/��������
Worker 1 log
Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Joined group at generation 1 and got assignment: Assignment{error=0, leader='connect-1-dd81ede5-5b4b-49c7-98c6-817b3c91ec66', leaderUrl='http://kafka-connect-01:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
Worker 2 log
Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Joined group at generation 1 and got assignment: Assignment{error=0, leader='connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743', leaderUrl='http://kafka-connect-03:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
Worker 3 log
Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) Joined group at generation 1 and got assignment: Assignment{error=0, leader='connect-1-8b98ba23-bc91-4d8e-9f0d-745c75200743', leaderUrl='http://kafka-connect-03:8083/', offset=-1, connectorIds=[], taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
Send config to worker 1 (groupX)
curl -s -X PUT -H "Content-Type:application/json" http://localhost:8083/connectors/source-datagen-01/config \ -d '{ "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "kafka.topic": "item_details_01", "max.interval":250, "quickstart": "ratings", "tasks.max": 6 }'
The logs show that the tasks are started and still running across all three workers:
Worker 1:
[Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-3 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-4 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-2 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Worker clientId=connect-1, groupId=kafka-connect-groupX] Starting task source-datagen-01-5 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) … WorkerSourceTask{id=source-datagen-01-5} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) WorkerSourceTask{id=source-datagen-01-3} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) WorkerSourceTask{id=source-datagen-01-4} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) WorkerSourceTask{id=source-datagen-01-2} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) WorkerSourceTask{id=source-datagen-01-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) WorkerSourceTask{id=source-datagen-01-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
Worker 2:
[Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-3 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-5 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) … WorkerSourceTask{id=source-datagen-01-3} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) WorkerSourceTask{id=source-datagen-01-1} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) WorkerSourceTask{id=source-datagen-01-5} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
Worker 3:
[Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-4 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [Worker clientId=connect-1, groupId=kafka-connect-groupY] Starting task source-datagen-01-2 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) … WorkerSourceTask{id=source-datagen-01-2} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) WorkerSourceTask{id=source-datagen-01-4} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask) WorkerSourceTask{id=source-datagen-01-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
Status topic
$ kafkacat -b localhost:9092 -t _kafka-connect-status -o beginning -f 'Topic %t[%p], offset: %o, Headers: %h, key: %k, payload: %S bytes: %s\n' -u -C % Reached end of topic _kafka-connect-status [0] at offset 0 % Reached end of topic _kafka-connect-status [1] at offset 0 % Reached end of topic _kafka-connect-status [2] at offset 0 % Reached end of topic _kafka-connect-status [3] at offset 0 % Reached end of topic _kafka-connect-status [4] at offset 0 Topic _kafka-connect-status[3], offset: 0, Headers: , key: status-connector-source-datagen-01, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":2} % Reached end of topic _kafka-connect-status [3] at offset 1 Topic _kafka-connect-status[3], offset: 1, Headers: , key: status-connector-source-datagen-01, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":2} % Reached end of topic _kafka-connect-status [3] at offset 2 Topic _kafka-connect-status[0], offset: 0, Headers: , key: status-task-source-datagen-01-0, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":3} Topic _kafka-connect-status[0], offset: 1, Headers: , key: status-task-source-datagen-01-4, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":3} % Reached end of topic _kafka-connect-status [0] at offset 2 Topic _kafka-connect-status[1], offset: 0, Headers: , key: status-task-source-datagen-01-2, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":3} % Reached end of topic _kafka-connect-status [1] at offset 1 Topic _kafka-connect-status[4], offset: 0, Headers: , key: status-task-source-datagen-01-5, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":3} % Reached end of topic _kafka-connect-status [4] at offset 1 Topic _kafka-connect-status[1], offset: 1, Headers: , key: status-task-source-datagen-01-1, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":3} Topic _kafka-connect-status[0], offset: 2, Headers: , key: status-task-source-datagen-01-3, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":3} % Reached end of topic _kafka-connect-status [1] at offset 2 % Reached end of topic _kafka-connect-status [0] at offset 3 Topic _kafka-connect-status[1], offset: 2, Headers: , key: status-task-source-datagen-01-2, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-03:8083","generation":3} Topic _kafka-connect-status[0], offset: 3, Headers: , key: status-task-source-datagen-01-4, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-03:8083","generation":3} % Reached end of topic _kafka-connect-status [1] at offset 3 Topic _kafka-connect-status[0], offset: 4, Headers: , key: status-task-source-datagen-01-0, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-03:8083","generation":3} % Reached end of topic _kafka-connect-status [0] at offset 5 Topic _kafka-connect-status[4], offset: 1, Headers: , key: status-task-source-datagen-01-5, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-02:8083","generation":3} Topic _kafka-connect-status[1], offset: 3, Headers: , key: status-task-source-datagen-01-1, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-02:8083","generation":3} % Reached end of topic _kafka-connect-status [4] at offset 2 Topic _kafka-connect-status[0], offset: 5, Headers: , key: status-task-source-datagen-01-3, payload: 86 bytes: {"state":"UNASSIGNED","trace":null,"worker_id":"kafka-connect-02:8083","generation":3} % Reached end of topic _kafka-connect-status [1] at offset 4 % Reached end of topic _kafka-connect-status [0] at offset 6 Topic _kafka-connect-status[0], offset: 6, Headers: , key: status-task-source-datagen-01-3, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":4} % Reached end of topic _kafka-connect-status [0] at offset 7 Topic _kafka-connect-status[1], offset: 4, Headers: , key: status-task-source-datagen-01-2, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":4} % Reached end of topic _kafka-connect-status [1] at offset 5 Topic _kafka-connect-status[0], offset: 7, Headers: , key: status-task-source-datagen-01-4, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":4} Topic _kafka-connect-status[0], offset: 8, Headers: , key: status-task-source-datagen-01-0, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-03:8083","generation":4} % Reached end of topic _kafka-connect-status [0] at offset 9 Topic _kafka-connect-status[1], offset: 5, Headers: , key: status-task-source-datagen-01-1, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":4} Topic _kafka-connect-status[4], offset: 2, Headers: , key: status-task-source-datagen-01-5, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-02:8083","generation":4} % Reached end of topic _kafka-connect-status [1] at offset 6 % Reached end of topic _kafka-connect-status [4] at offset 3 Topic _kafka-connect-status[4], offset: 3, Headers: , key: status-task-source-datagen-01-5, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3} % Reached end of topic _kafka-connect-status [4] at offset 4 Topic _kafka-connect-status[0], offset: 9, Headers: , key: status-task-source-datagen-01-3, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3} % Reached end of topic _kafka-connect-status [0] at offset 10 Topic _kafka-connect-status[1], offset: 6, Headers: , key: status-task-source-datagen-01-2, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3} Topic _kafka-connect-status[1], offset: 7, Headers: , key: status-task-source-datagen-01-1, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3} Topic _kafka-connect-status[0], offset: 10, Headers: , key: status-task-source-datagen-01-4, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3} Topic _kafka-connect-status[0], offset: 11, Headers: , key: status-task-source-datagen-01-0, payload: 83 bytes: {"state":"RUNNING","trace":null,"worker_id":"kafka-connect-01:8083","generation":3} % Reached end of topic _kafka-connect-status [1] at offset 8 % Reached end of topic _kafka-connect-status [0] at offset 12
Status from worker 1 REST API
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \ jq '."source-datagen-01".status.tasks' [ { "id": 0, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 2, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 3, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 4, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 5, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" } ]
Status from worker 2 REST API
$ curl -s "http://localhost:18083/connectors?expand=info&expand=status" | \ jq '."source-datagen-01".status.tasks' [ { "id": 0, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 2, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 3, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 4, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 5, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" } ]
Status from worker 3 REST API
$ curl -s "http://localhost:28083/connectors?expand=info&expand=status" | \ jq '."source-datagen-01".status.tasks' [ { "id": 0, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 1, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 2, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 3, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 4, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" }, { "id": 5, "state": "RUNNING", "worker_id": "kafka-connect-01:8083" } ]
i.e. the tasks are reported as running only on worker 1. At a guess because the status topic is compacted/read as a table and since it is keyed on the task id only the latest state (which happens to be Worker 1) is read.