Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka-python waiting for ever when Kafka node goes down kafka-python (1.3.3/1.3.4/1.3.5) #1354

Closed
priscofarina opened this issue Jan 24, 2018 · 19 comments

Comments

@priscofarina
Copy link

priscofarina commented Jan 24, 2018

In my python code I have:

group = mytest
 try:
       consumer = KafkaConsumer(bootstrap_servers=kafka_multi_hosts, auto_offset_reset='latest', enable_auto_commit=False, group_id=self.topics['customer']['group'], reconnect_backoff_ms=1,consumer_timeout_ms=1000)
       break
 except Exception, err:

while True:
   try:
     for msg in consumer:
        #do the work
     except Exception, err:
        #manage exception ! not possible in 1.3.5

My consumer is trying to read all messages within group = mytest. My kafka cluster is formed by 3 zk and 3 kafka nodes.

kubectl get pods -n kafka
NAME      READY     STATUS    RESTARTS   AGE
kafka-0   1/1       Running   3          7m
kafka-1   1/1       Running   0          4m
kafka-2   1/1       Running   0          3m
zk-0      1/1       Running   0          49m
zk-1      1/1       Running   0          7m
zk-2      1/1       Running   0          1h

What I am observing is that if I restart one node (kafka-0 e.g) then I see that my python consumer is not able anymore to read messages sent in kafka.

Following my error:

2018-01-24 14:35:49,057.57.5590133667:kafka.client:140283959863040:INFO:1:Bootstrapping cluster metadata from [('kafka-1.kafka.kafka', 9092, 0), ('kafka-2.kafka.kafka', 9092, 0), ('kafka-0.kafka.kafka', 9092, 0)]
2018-01-24 14:35:49,060.60.6129169464:kafka.conn:140283959863040:INFO:1:<BrokerConnection node_id=bootstrap host=kafka-1.kafka.kafka/10.244.3.3 port=9092>: connecting to 10.244.3.3:9092
2018-01-24 14:35:49,067.67.920923233:kafka.client:140283959863040:INFO:1:Bootstrap succeeded: found 3 brokers and 6 topics.
2018-01-24 14:35:49,068.68.0420398712:kafka.conn:140283959863040:INFO:1:<BrokerConnection node_id=bootstrap host=kafka-1.kafka.kafka/10.244.3.3 port=9092>: Closing connection. 
2018-01-24 14:35:49,073.73.9550590515:kafka.conn:140283959863040:INFO:1:<BrokerConnection node_id=2 host=kafka-2.kafka.kafka.svc.cluster.local/10.244.0.2 port=9092>: connecting to 10.244.0.2:9092
2018-01-24 14:35:49,179.179.658889771:kafka.conn:140283959863040:INFO:1:Broker version identifed as 0.10.2
2018-01-24 14:35:49,180.180.321931839:kafka.conn:140283959863040:INFO:1:Set configuration api_version=(0, 10, 2) to skip auto check_version requests on startup
2018-01-24 14:35:49,182.182.070016861:kafka.consumer.subscription_state:140283959863040:INFO:1:Updating subscribed topics to: ['customer']
2018-01-24 14:35:49,185.185.465097427:kafka.conn:140283959863040:INFO:1:<BrokerConnection node_id=1 host=kafka-1.kafka.kafka.svc.cluster.local/10.244.3.3 port=9092>: connecting to 10.244.3.3:9092
2018-01-24 14:35:49,186.186.050891876:kafka.cluster:140283959863040:INFO:1:Group coordinator for testGroup is BrokerMetadata(nodeId=1, host=u'kafka-1.kafka.kafka.svc.cluster.local', port=9092, rack=None)
2018-01-24 14:35:49,186.186.160087585:kafka.coordinator:140283959863040:INFO:1:Discovered coordinator 1 for group testGroup
2018-01-24 14:35:49,186.186.475992203:kafka.coordinator.consumer:140283959863040:INFO:1:Revoking previously assigned partitions set([]) for group testGroup
2018-01-24 14:35:49,186.186.72990799:kafka.coordinator:140283959863040:INFO:1:(Re-)joining group testGroup
2018-01-24 14:35:49,189.189.13602829:kafka.coordinator:140283959863040:INFO:1:Joined group 'testGroup' (generation 1506) with member_id kafka-python-1.3.5-67ae96c1-ed20-463b-ba0e-ea7f2aa20cb9
2018-01-24 14:35:49,189.189.275026321:kafka.coordinator:140283959863040:INFO:1:Elected group leader -- performing partition assignments using range
2018-01-24 14:35:49,196.196.614027023:kafka.coordinator:140283959863040:INFO:1:Successfully joined group testGroup with generation 1506
2018-01-24 14:35:49,196.196.995019913:kafka.consumer.subscription_state:140283959863040:INFO:1:Updated partition assignment: [TopicPartition(topic=u'customer', partition=0), TopicPartition(topic=u'customer', partition=1), TopicPartition(topic=u'customer', partition=2), TopicPartition(topic=u'customer', partition=3), TopicPartition(topic=u'customer', partition=4), TopicPartition(topic=u'customer', partition=5), TopicPartition(topic=u'customer', partition=6), TopicPartition(topic=u'customer', partition=7), TopicPartition(topic=u'customer', partition=8), TopicPartition(topic=u'customer', partition=9), TopicPartition(topic=u'customer', partition=10), TopicPartition(topic=u'customer', partition=11), TopicPartition(topic=u'customer', partition=12), TopicPartition(topic=u'customer', partition=13), TopicPartition(topic=u'customer', partition=14), TopicPartition(topic=u'customer', partition=15), TopicPartition(topic=u'customer', partition=16), TopicPartition(topic=u'customer', partition=17), TopicPartition(topic=u'customer', partition=18), TopicPartition(topic=u'customer', partition=19), TopicPartition(topic=u'customer', partition=20), TopicPartition(topic=u'customer', partition=21), TopicPartition(topic=u'customer', partition=22), TopicPartition(topic=u'customer', partition=23), TopicPartition(topic=u'customer', partition=24), TopicPartition(topic=u'customer', partition=25), TopicPartition(topic=u'customer', partition=26), TopicPartition(topic=u'customer', partition=27), TopicPartition(topic=u'customer', partition=28), TopicPartition(topic=u'customer', partition=29)]
2018-01-24 14:35:49,197.197.153091431:kafka.coordinator.consumer:140283959863040:INFO:1:Setting newly assigned partitions set([TopicPartition(topic=u'customer', partition=1), TopicPartition(topic=u'customer', partition=21), TopicPartition(topic=u'customer', partition=26), TopicPartition(topic=u'customer', partition=15), TopicPartition(topic=u'customer', partition=19), TopicPartition(topic=u'customer', partition=24), TopicPartition(topic=u'customer', partition=13), TopicPartition(topic=u'customer', partition=17), TopicPartition(topic=u'customer', partition=6), TopicPartition(topic=u'customer', partition=11), TopicPartition(topic=u'customer', partition=4), TopicPartition(topic=u'customer', partition=9), TopicPartition(topic=u'customer', partition=29), TopicPartition(topic=u'customer', partition=2), TopicPartition(topic=u'customer', partition=22), TopicPartition(topic=u'customer', partition=27), TopicPartition(topic=u'customer', partition=0), TopicPartition(topic=u'customer', partition=20), TopicPartition(topic=u'customer', partition=25), TopicPartition(topic=u'customer', partition=14), TopicPartition(topic=u'customer', partition=18), TopicPartition(topic=u'customer', partition=7), TopicPartition(topic=u'customer', partition=12), TopicPartition(topic=u'customer', partition=16), TopicPartition(topic=u'customer', partition=5), TopicPartition(topic=u'customer', partition=10), TopicPartition(topic=u'customer', partition=3), TopicPartition(topic=u'customer', partition=8), TopicPartition(topic=u'customer', partition=23), TopicPartition(topic=u'customer', partition=28)]) for group testGroup
2018-01-24 14:35:49,201.201.680898666:kafka.conn:140283959863040:INFO:1:<BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.2.12 port=9092>: connecting to 10.244.2.12:9092
2018-01-24 14:35:49,313.313.872098923:kafka.coordinator:140283959863040:INFO:1:Leaving consumer group (testGroup).
2018-01-24 14:35:49,318.318.325042725:kafka.coordinator:140283959863040:INFO:1:LeaveGroup request succeeded
2018-01-24 14:35:49,318.318.53890419:kafka.conn:140283959863040:INFO:1:<BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.2.12 port=9092>: Closing connection. 
2018-01-24 14:35:49,318.318.893909454:kafka.conn:140283959863040:INFO:1:<BrokerConnection node_id=1 host=kafka-1.kafka.kafka.svc.cluster.local/10.244.3.3 port=9092>: Closing connection. 
2018-01-24 14:35:49,319.319.184064865:kafka.conn:140283959863040:INFO:1:<BrokerConnection node_id=2 host=kafka-2.kafka.kafka.svc.cluster.local/10.244.0.2 port=9092>: Closing connection.

Instead if I lunch another consumer (completely identical) but in another group = mytest2 , it is able to read new messages.

Could you please help me?

My kafka has the following parameter:

        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9092 \
          --override zookeeper.connect=zk-0.zk-svc.kafka.svc.cluster.local:2181,zk-1.zk-svc.kafka.svc.cluster.local:2181,zk-2.zk-svc.kafka.svc.cluster.local:2181 \
          --override log.dir=/var/lib/kafka \
          --override default.replication.factor=3 \
          --override offsets.topic.replication.factor=3 \
          --override min.insync.replicas=2 \
          --override auto.create.topics.enable=false "

Can you please help?
(From cli I am able to see these messages)

@dpkp
Copy link
Owner

dpkp commented Jan 24, 2018

Something is causing your consumer to leave the group:

... Leaving consumer group (testGroup).

Can you post debug logs?

@priscofarina
Copy link
Author

priscofarina commented Jan 24, 2018

[WARNING] 01/24/2018 06:23:49 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:49 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:49 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:49 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:49 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:49 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:49 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
^C[WARNING] 01/24/2018 06:23:49 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:49 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:49 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:49 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:49 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:49 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:49 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:49 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:49 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:49 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:49 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:50 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:50 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:50 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:50 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:50 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:50 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:50 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:50 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:50 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:50 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:50 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:50 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:50 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:50 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:50 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:50 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:50 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:50 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:50 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:50 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:51 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/24/2018 06:23:51 PM [base.py:_handle_heartbeat_failure:692] [] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[WARNING] 01/24/2018 06:23:51 PM [base.py:__call__:661] [] Coordinator unknown during heartbeat -- will retry

After one node down. Now it is up&running but I still have this problem

NAME      READY     STATUS    RESTARTS   AGE
kafka-0   1/1       Running   0          9m
kafka-1   1/1       Running   0          2h
kafka-2   1/1       Running   0          2h
zk-0      1/1       Running   0          2h
zk-1      1/1       Running   0          9m
zk-2      1/1       Running   0          2h

@dpkp
Copy link
Owner

dpkp commented Jan 24, 2018

Looks like you are running kafka-python 1.3.3 . There have been a number of improvements to this logic in more recent releases as well as some large changes in master. Have you tried running the code in master?

@priscofarina
Copy link
Author

priscofarina commented Jan 24, 2018

I still haven't tried.
I have tried latest official release 1.3.5, but I had some problems (link)

@dpkp, do you see any problem in the configuration I am using for kafka on K8S listed above?

@dpkp
Copy link
Owner

dpkp commented Jan 24, 2018

No. I think this is a dup of #1306

@priscofarina
Copy link
Author

is : pip install kafka-python enough to use the release you are talking about ?

@dpkp
Copy link
Owner

dpkp commented Jan 24, 2018

to install from master you'll need to clone the git repo and install from there:

git clone https://github.com/dpkp/kafka-python
cd kafka-python
pip install .

(that's from memory, but I think it is correct)

@priscofarina
Copy link
Author

priscofarina commented Jan 24, 2018

is there any configuration you can suggest for Kafka (yaml) on K8S @dpkp ? I mean, you have used to verify old bugs are now solved..
then, my kafka consumer/producer class in puthon are just opening a connection to read&write msg on topic.. also in this: any suggestion for Kafka...(args)?
tks for the great help :)

@priscofarina
Copy link
Author

priscofarina commented Jan 25, 2018

Anyhow by using the new code suggested by you @dpkp I see the following logs :

2018-01-25 09:53:12,789.789.495944977:kafka.client:139861288605440:INFO:1:Bootstrapping cluster metadata from [('kafka-2.kafka.kafka', 9092, 0), ('kafka-0.kafka.kafka', 9092, 0), ('kafka-1.kafka.kafka', 9092, 0)]
2018-01-25 09:53:12,793.793.054103851:kafka.conn:139861288605440:INFO:1:<BrokerConnection node_id=bootstrap host=kafka-2.kafka.kafka/10.244.2.20 port=9092>: connecting to 10.244.2.20:9092
2018-01-25 09:53:12,799.799.653053284:kafka.client:139861288605440:INFO:1:Bootstrap succeeded: found 3 brokers and 6 topics.
2018-01-25 09:53:12,799.799.79300499:kafka.conn:139861288605440:INFO:1:<BrokerConnection node_id=bootstrap host=kafka-2.kafka.kafka/10.244.2.20 port=9092>: Closing connection. 
2018-01-25 09:53:12,804.804.583072662:kafka.conn:139861288605440:INFO:1:<BrokerConnection node_id=2 host=kafka-2.kafka.kafka.svc.cluster.local/10.244.2.20 port=9092>: connecting to 10.244.2.20:9092
2018-01-25 09:53:13,962.962.577104568:kafka.conn:139861288605440:INFO:1:Broker version identifed as 0.10.2
2018-01-25 09:53:13,962.962.798118591:kafka.conn:139861288605440:INFO:1:Set configuration api_version=(0, 10, 2) to skip auto check_version requests on startup
2018-01-25 09:53:13,964.964.041948318:kafka.consumer.subscription_state:139861288605440:INFO:1:Updating subscribed topics to: ['customer']
2018-01-25 09:53:13,966.966.608047485:kafka.conn:139861288605440:INFO:1:<BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>: connecting to 10.244.0.3:9092
2018-01-25 09:53:13,967.967.216014862:kafka.cluster:139861288605440:INFO:1:Group coordinator for testGrouppri is BrokerMetadata(nodeId=0, host=u'kafka-0.kafka.kafka.svc.cluster.local', port=9092, rack=None)
2018-01-25 09:53:13,967.967.319011688:kafka.coordinator:139861288605440:INFO:1:Discovered coordinator 0 for group testGrouppri
2018-01-25 09:53:13,967.967.559099197:kafka.coordinator.consumer:139861288605440:INFO:1:Revoking previously assigned partitions set([]) for group testGrouppri
2018-01-25 09:53:13,967.967.653989792:kafka.coordinator:139861288605440:INFO:1:Starting new heartbeat thread
2018-01-25 09:53:13,968.968.173027039:kafka.coordinator:139861288605440:INFO:1:(Re-)joining group testGrouppri

2018-01-25 09:53:41,337.337.631940842:kafka.coordinator:139861288605440:INFO:1:Elected group leader -- performing partition assignments using range
2018-01-25 09:53:41,342.342.85402298:kafka.coordinator:139861288605440:INFO:1:Successfully joined group testGrouppri with generation 27
2018-01-25 09:53:41,343.343.455076218:kafka.consumer.subscription_state:139861288605440:INFO:1:Updated partition assignment: [TopicPartition(topic=u'customer', partition=0), TopicPartition(topic=u'customer', partition=1), TopicPartition(topic=u'customer', partition=2), TopicPartition(topic=u'customer', partition=3), TopicPartition(topic=u'customer', partition=4), TopicPartition(topic=u'customer', partition=5), TopicPartition(topic=u'customer', partition=6), TopicPartition(topic=u'customer', partition=7), TopicPartition(topic=u'customer', partition=8), TopicPartition(topic=u'customer', partition=9), TopicPartition(topic=u'customer', partition=10), TopicPartition(topic=u'customer', partition=11), TopicPartition(topic=u'customer', partition=12), TopicPartition(topic=u'customer', partition=13), TopicPartition(topic=u'customer', partition=14)]
2018-01-25 09:53:41,343.343.652009964:kafka.coordinator.consumer:139861288605440:INFO:1:Setting newly assigned partitions set([TopicPartition(topic=u'customer', partition=1), TopicPartition(topic=u'customer', partition=0), TopicPartition(topic=u'customer', partition=14), TopicPartition(topic=u'customer', partition=13), TopicPartition(topic=u'customer', partition=7), TopicPartition(topic=u'customer', partition=12), TopicPartition(topic=u'customer', partition=6), TopicPartition(topic=u'customer', partition=11), TopicPartition(topic=u'customer', partition=5), TopicPartition(topic=u'customer', partition=10), TopicPartition(topic=u'customer', partition=4), TopicPartition(topic=u'customer', partition=9), TopicPartition(topic=u'customer', partition=3), TopicPartition(topic=u'customer', partition=8), TopicPartition(topic=u'customer', partition=2)]) for group testGrouppri
2018-01-25 09:53:41,348.348.352909088:kafka.conn:139861288605440:INFO:1:<BrokerConnection node_id=1 host=kafka-1.kafka.kafka.svc.cluster.local/10.244.3.23 port=9092>: connecting to 10.244.3.23:9092
2018-01-25 09:53:44,360.360.621929169:kafka.client:139861288605440:INFO:1:Bootstrapping cluster metadata from [('kafka-0.kafka.kafka', 9092, 0), ('kafka-1.kafka.kafka', 9092, 0), ('kafka-2.kafka.kafka', 9092, 0)]
2018-01-25 09:53:44,363.363.965034485:kafka.conn:139861288605440:INFO:1:<BrokerConnection node_id=bootstrap host=kafka-0.kafka.kafka/10.244.0.3 port=9092>: connecting to 10.244.0.3:9092
2018-01-25 09:53:44,369.369.103908539:kafka.client:139861288605440:INFO:1:Bootstrap succeeded: found 3 brokers and 6 topics.
2018-01-25 09:53:44,369.369.271993637:kafka.conn:139861288605440:INFO:1:<BrokerConnection node_id=bootstrap host=kafka-0.kafka.kafka/10.244.0.3 port=9092>: Closing connection. 
2018-01-25 09:53:44,371.371.920108795:kafka.conn:139861288605440:INFO:1:<BrokerConnection node_id=2 host=kafka-2.kafka.kafka.svc.cluster.local/10.244.2.20 port=9092>: connecting to 10.244.2.20:9092
2018-01-25 09:53:45,529.529.648065567:kafka.conn:139861288605440:INFO:1:Broker version identifed as 0.10.2
2018-01-25 09:53:45,529.529.823064804:kafka.conn:139861288605440:INFO:1:Set configuration api_version=(0, 10, 2) to skip auto check_version requests on startup
2018-01-25 09:53:45,531.531.38589859:kafka.consumer.subscription_state:139861288605440:INFO:1:Updating subscribed topics to: ['customer']
2018-01-25 09:53:45,535.535.149097443:kafka.conn:139861288605440:INFO:1:<BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>: connecting to 10.244.0.3:9092
2018-01-25 09:53:45,536.536.257982254:kafka.cluster:139861288605440:INFO:1:Group coordinator for testGrouppri is BrokerMetadata(nodeId=0, host=u'kafka-0.kafka.kafka.svc.cluster.local', port=9092, rack=None)
2018-01-25 09:53:45,536.536.464929581:kafka.coordinator:139861288605440:INFO:1:Discovered coordinator 0 for group testGrouppri
2018-01-25 09:53:45,536.536.781072617:kafka.coordinator.consumer:139861288605440:INFO:1:Revoking previously assigned partitions set([]) for group testGrouppri
2018-01-25 09:53:45,537.537.097930908:kafka.coordinator:139861288605440:INFO:1:Starting new heartbeat thread
2018-01-25 09:53:45,537.537.656068802:kafka.coordinator:139861288605440:INFO:1:(Re-)joining group testGrouppri
2018-01-25 09:53:47,372.372.895956039:kafka.coordinator:139861270435584:WARNING:1:Heartbeat failed for group testGrouppri because it is rebalancing
2018-01-25 09:53:50,443.443.5069561:kafka.coordinator:139861270435584:WARNING:1:Heartbeat failed for group testGrouppri because it is rebalancing
2018-01-25 09:53:53,555.555.71603775:kafka.coordinator:139861270435584:WARNING:1:Heartbeat failed for group testGrouppri because it is rebalancing
2018-01-25 09:53:56,567.567.375898361:kafka.coordinator:139861270435584:WARNING:1:Heartbeat failed for group testGrouppri because it is rebalancing
2018-01-25 09:53:59,632.632.883071899:kafka.coordinator:139861270435584:WARNING:1:Heartbeat failed for group testGrouppri because it is rebalancing
2018-01-25 09:54:02,645.645.275115967:kafka.coordinator:139861270435584:WARNING:1:Heartbeat failed for group testGrouppri because it is rebalancing
2018-01-25 09:54:05,560.560.647010803:kafka.coordinator:139861270435584:WARNING:1:Heartbeat failed for group testGrouppri because it is rebalancing
2018-01-25 09:54:08,733.733.959913254:kafka.coordinator:139861270435584:WARNING:1:Heartbeat failed for group testGrouppri because it is rebalancing
2018-01-25 09:54:11,645.645.489931107:kafka.coordinator:139861270435584:WARNING:1:Heartbeat failed for group testGrouppri because it is rebalancing
2018-01-25 09:54:14,757.757.486104965:kafka.coordinator:139861270435584:WARNING:1:Heartbeat failed for group testGrouppri because it is rebalancing

Then If I shutdonw one node of Kafka (kafka-0 e.g) it will never goes to other node set in the KafkaConsumer:

My KafkaConsumer:

 consumer = KafkaConsumer(bootstrap_servers="kafka-0.kafka.kafka:9092,kafka-1.kafka.kafka:9092,kafka-2.kafka.kafka:9092", auto_offset_reset='latest', group_id=group, session_timeout_ms=10000, max_poll_interval_ms= 10000)
              break
Down KAFKA-0 :

2018-01-25 10:19:22,651.651.091098785:kafka.coordinator:140401087997696:WARNING:1:Marking the coordinator dead (node 0) for group testGrouppri: Heartbeat session expired.
2018-01-25 10:19:22,653.653.601884842:kafka.cluster:140401106167552:INFO:1:Group coordinator for testGrouppri is BrokerMetadata(nodeId=0, host=u'kafka-0.kafka.kafka.svc.cluster.local', port=9092, rack=None)
2018-01-25 10:19:22,653.653.717041016:kafka.coordinator:140401106167552:INFO:1:Discovered coordinator 0 for group testGrouppri
2018-01-25 10:19:25,172.172.274112701:kafka.coordinator:140401106167552:ERROR:1:Error sending OffsetCommitRequest_v2 to node 0 [TooManyInFlightRequests: <BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>]
2018-01-25 10:19:25,177.177.380084991:kafka.coordinator.consumer:140401106167552:WARNING:1:Auto offset commit failed for group testGrouppri: TooManyInFlightRequests: <BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>
2018-01-25 10:19:25,679.679.177045822:kafka.coordinator:140401106167552:ERROR:1:Error sending OffsetCommitRequest_v2 to node 0 [TooManyInFlightRequests: <BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>]
2018-01-25 10:19:25,684.684.012889862:kafka.coordinator.consumer:140401106167552:WARNING:1:Auto offset commit failed for group testGrouppri: TooManyInFlightRequests: <BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>
2018-01-25 10:19:26,186.186.141967773:kafka.coordinator:140401106167552:ERROR:1:Error sending OffsetCommitRequest_v2 to node 0 [TooManyInFlightRequests: <BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>]
2018-01-25 10:19:26,210.210.643053055:kafka.coordinator.consumer:140401106167552:WARNING:1:Auto offset commit failed for group testGrouppri: TooManyInFlightRequests: <BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>
2018-01-25 10:19:26,712.712.781906128:kafka.coordinator:140401106167552:ERROR:1:Error sending OffsetCommitRequest_v2 to node 0 [TooManyInFlightRequests: <BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>]
2018-01-25 10:19:26,715.715.682029724:kafka.coordinator.consumer:140401106167552:WARNING:1:Auto offset commit failed for group testGrouppri: TooManyInFlightRequests: <BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>
2018-01-25 10:19:27,218.218.116044998:kafka.coordinator:140401106167552:ERROR:1:Error sending OffsetCommitRequest_v2 to node 0 [TooManyInFlightRequests: <BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>]











2018-01-25 10:21:24,173.173.549890518:kafka.conn:140401106167552:ERROR:1:<BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>: Error receiving network data closing socket
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/kafka/conn.py", line 757, in _recv
    data = self._sock.recv(self.config['sock_chunk_bytes'])
error: [Errno 110] Connection timed out
2018-01-25 10:21:24,174.174.321889877:kafka.conn:140401106167552:INFO:1:<BrokerConnection node_id=0 host=kafka-0.kafka.kafka.svc.cluster.local/10.244.0.3 port=9092>: Closing connection. ConnectionError: [Errno 110] Connection timed out
2018-01-25 10:21:24,174.174.499988556:kafka.client:140401106167552:WARNING:1:Node 0 connection failed -- refreshing metadata
2018-01-25 10:21:24,174.174.679040909:kafka.consumer.fetcher:140401106167552:ERROR:1:Fetch to node 0 failed: ConnectionError: [Errno 110] Connection timed out
2018-01-25 10:21:24,174.174.763917923:kafka.coordinator:140401106167552:ERROR:1:Error sending HeartbeatRequest_v0 to node 0 [ConnectionError: [Errno 110] Connection timed out]
ù

In conclusion I would like to verify that if one node goes down, my Consumer (waiting forever) is able to switch on another server passed in the bootstart_server list in KafkaConsumer as :

KafkaConsumer(bootstrap_servers=["kafka-0.kafka.kafka.svc.cluster.local:9092","kafka-1.kafka.kafka.svc.cluster.local:9092","kafka-2.kafka.kafka.svc.cluster.local:9092"] , ....)

@geoff-va
Copy link

geoff-va commented Jan 25, 2018

I am having what seems to be a similar issue. (kafka-python 1.3.5)

I have two kafka brokers running and both are in the broker list.

If I kill one of them, I get the following:

<KILL ONE KAFKA BROKER>
2018-01-25 17:43:40,352::kafka.conn::ERROR::<BrokerConnection node_id=0 host=server.domain.com/XXX.XXX.XXX.XXX port=9094>: socket disconnected
2018-01-25 17:43:40,353::kafka.conn::INFO::<BrokerConnection node_id=0 host=server.domain.com/XXX.XXX.XXX.XXX port=9094>: Closing connection. ConnectionError: socket disconnected
2018-01-25 17:43:40,353::kafka.client::WARNING::Node 0 connection failed -- refreshing metadata
2018-01-25 17:43:40,353::kafka.coordinator::ERROR::Error sending OffsetCommitRequest_v2 to node 0 [ConnectionError: socket disconnected]
2018-01-25 17:43:40,353::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: ConnectionError: socket disconnected.
2018-01-25 17:43:40,354::kafka.conn::INFO::<BrokerConnection node_id=0 host=server.domain.com/XXX.XXX.XXX.XXX port=9094>: Closing connection. ConnectionError: Socket EVENT_READ without in-flight-requests
2018-01-25 17:43:40,354::kafka.client::WARNING::Node 0 connection failed -- refreshing metadata
2018-01-25 17:43:40,378::kafka.conn::INFO::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: Loading SSL CA from /code/kafka/kafka_consumer/CARoot.pem
2018-01-25 17:43:40,379::kafka.conn::INFO::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: connecting to XXX.XXX.XXX.XXX:9094
2018-01-25 17:43:40,436::kafka.conn::INFO::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: Authenticated as demoManager
2018-01-25 17:43:40,846::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:40,846::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:40,846::kafka.client::INFO::Broker metadata change detected for node 0 from XXX.XXX.XXX.XXX:9094 to server.domain.com:9094
2018-01-25 17:43:40,881::kafka.conn::INFO::<BrokerConnection node_id=0 host=server.domain.com/XXX.XXX.XXX.XXX port=9094>: Loading SSL CA from /code/kafka/kafka_consumer/CARoot.pem
2018-01-25 17:43:40,881::kafka.conn::INFO::<BrokerConnection node_id=0 host=server.domain.com/XXX.XXX.XXX.XXX port=9094>: connecting to XXX.XXX.XXX.XXX:9094
2018-01-25 17:43:40,882::kafka.coordinator::ERROR::Error sending OffsetCommitRequest_v2 to node 0 [NodeNotReadyError: 0]
2018-01-25 17:43:40,882::kafka.conn::ERROR::Connect attempt to <BrokerConnection node_id=0 host=server.domain.com/XXX.XXX.XXX.XXX port=9094> returned error 111. Disconnecting.
2018-01-25 17:43:40,882::kafka.conn::INFO::<BrokerConnection node_id=0 host=server.domain.com/XXX.XXX.XXX.XXX port=9094>: Closing connection. ConnectionError: 111
…..<REPEAT ABOVE MANY TIMES>
2018-01-25 17:43:47,334::kafka.client::WARNING::Node 0 connection failed -- refreshing metadata
2018-01-25 17:43:47,435::kafka.client::INFO::Broker metadata change detected for node 0 from XXX.XXX.XXX.XXX:9094 to server.domain.com:9094
2018-01-25 17:43:49,254::kafka.coordinator::WARNING::Coordinator unknown during heartbeat -- will retry
2018-01-25 17:43:49,255::kafka.coordinator::WARNING::Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
2018-01-25 17:43:49,356::kafka.coordinator::WARNING::Coordinator unknown during heartbeat -- will retry
2018-01-25 17:43:49,356::kafka.coordinator::WARNING::Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
……<REPEAT ABOVE FOREVER>

<RESTART THE KAFKA BROKER>
2018-01-25 17:43:57,236::kafka.coordinator::ERROR::Error sending OffsetCommitRequest_v2 to node 0 [NodeNotReadyError: 0]
2018-01-25 17:43:57,236::kafka.coordinator::ERROR::Error sending OffsetCommitRequest_v2 to node 0 [NodeNotReadyError: 0]
2018-01-25 17:43:57,236::kafka.coordinator::ERROR::Error sending OffsetCommitRequest_v2 to node 0 [NodeNotReadyError: 0]
2018-01-25 17:43:57,237::kafka.coordinator::ERROR::Error sending OffsetCommitRequest_v2 to node 0 [NodeNotReadyError: 0]
2018-01-25 17:43:57,238::kafka.conn::INFO::<BrokerConnection node_id=0 host=server.domain.com/XXX.XXX.XXX.XXX port=9094>: Authenticated as demoManager
2018-01-25 17:43:57,263::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:57,321::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:57,321::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:57,368::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:57,376::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:57,376::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:57,472::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:57,476::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:57,476::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:57,577::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:57,586::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:57,587::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:57,681::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:57,687::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:57,687::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:57,786::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:57,792::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:57,792::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:57,891::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:57,894::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:57,894::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:57,995::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:57,997::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:57,997::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:58,099::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:58,102::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:58,102::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:58,204::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:58,207::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:58,207::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:58,307::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:58,309::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:58,309::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp
2018-01-25 17:43:58,412::kafka.coordinator::WARNING::Marking the coordinator dead (node 0) for group demo-manager-grp: [Error 16] NotCoordinatorForGroupError.
2018-01-25 17:43:58,414::kafka.cluster::INFO::Group coordinator for demo-manager-grp is BrokerMetadata(nodeId=0, host='server.domain.com', port=9094, rack=None)
2018-01-25 17:43:58,415::kafka.coordinator::INFO::Discovered coordinator 0 for group demo-manager-grp

There are two kafka brokers running, so I would assume it should switch over to the other broker.

I also notice something odd - at least to me it looks odd:

2018-01-25 17:43:47,435::kafka.client::INFO::Broker metadata change detected for node 0 from XXX.XXX.XXX.XXX:9094 to server.domain.com:9094

It looks like it's saying the meta data changed from an ip address to the dns server name?

One other note - if I kill the other broker (with the first one still up) - looks like it properly switches brokers:

2018-01-25 18:13:27,774::kafka.conn::ERROR::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: socket disconnected
2018-01-25 18:13:27,774::kafka.conn::INFO::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: Closing connection. ConnectionError: socket disconnected
2018-01-25 18:13:27,774::kafka.client::WARNING::Node 1 connection failed -- refreshing metadata
2018-01-25 18:13:27,774::kafka.consumer.fetcher::ERROR::Fetch to node 1 failed: ConnectionError: socket disconnected
2018-01-25 18:13:27,774::kafka.client::INFO::Broker metadata change detected for node 1 from XXX.XXX.XXX.XXX:9094 to other.server.domain.com:9094
2018-01-25 18:13:27,775::kafka.conn::INFO::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: Closing connection. ConnectionError: Socket EVENT_READ without in-flight-requests
2018-01-25 18:13:27,776::kafka.client::WARNING::Node 1 connection failed -- refreshing metadata
2018-01-25 18:13:27,798::kafka.conn::INFO::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: Loading SSL CA from /code/kafka/kafka_consumer/CARoot.pem
2018-01-25 18:13:27,799::kafka.conn::INFO::<BrokerConnection node_id=0 host=server.domain.com/XXX.XXX.XXX.XXX port=9094>: Loading SSL CA from /code/kafka/kafka_consumer/CARoot.pem
2018-01-25 18:13:27,799::kafka.conn::INFO::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: connecting to XXX.XXX.XXX.XXX:9094
2018-01-25 18:13:27,799::kafka.conn::INFO::<BrokerConnection node_id=0 host=server.domain.com/XXX.XXX.XXX.XXX port=9094>: connecting to XXX.XXX.XXX.XXX:9094
2018-01-25 18:13:27,799::kafka.conn::ERROR::Connect attempt to <BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094> returned error 111. Disconnecting.
2018-01-25 18:13:27,800::kafka.conn::INFO::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: Closing connection. ConnectionError: 111
2018-01-25 18:13:27,800::kafka.client::WARNING::Node 1 connection failed -- refreshing metadata
2018-01-25 18:13:27,800::kafka.client::INFO::Broker metadata change detected for node 1 from XXX.XXX.XXX.XXX:9094 to other.server.domain.com:9094
2018-01-25 18:13:27,824::kafka.conn::INFO::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: Loading SSL CA from /code/kafka/kafka_consumer/CARoot.pem
2018-01-25 18:13:27,824::kafka.conn::INFO::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: connecting to XXX.XXX.XXX.XXX:9094
2018-01-25 18:13:27,824::kafka.conn::ERROR::Connect attempt to <BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094> returned error 111. Disconnecting.
2018-01-25 18:13:27,824::kafka.conn::INFO::<BrokerConnection node_id=1 host=other.server.domain.com/XXX.XXX.XXX.XXX port=9094>: Closing connection. ConnectionError: 111
2018-01-25 18:13:27,824::kafka.client::WARNING::Node 1 connection failed -- refreshing metadata
2018-01-25 18:13:28,491::kafka.conn::INFO::<BrokerConnection node_id=0 host=server.domain.com/XXX.XXX.XXX.XXX port=9094>: Authenticated as demoManager

@geoff-va
Copy link

I think I found part of the issue in my case -

I needed to replicate the __consumer_offsets topic and it's partitions across all of my nodes. This only occurred when I killed the only node that contained the sole copy of that topic.

Now that I've increased the replication, the NotCoordinatorForGroupError goes away and it looks like it's properly switching.

@dpkp
Copy link
Owner

dpkp commented Feb 13, 2018

I believe this issue is fixed in 1.4+ . Please reopen and add details if you're able to duplicate the issue on the latest release.

@dpkp dpkp closed this as completed Feb 13, 2018
@ssgaur
Copy link

ssgaur commented Apr 27, 2018

@dpkp I am using 1.4.2 version. I am still observing same issue.

Initially my connection to Kafka broker was up and running. Once I stopped Broker I conitnious kafka.conn error and its keep coming. I can't find from where this is coming. There is no way I can stop using exceptions or connection error: Also when i restart broker, then my python server is like dead meaning all request are being timed out.

.Kafka_Connection:Internal Kafka Connection Initiated on 127.0.0.1:9092
.Kafka_Connection:Internal Kafka Connection Completed on 127.0.0.1:9092
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata

ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata

ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata

ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata
ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata

ERROR:kafka.conn:Connect attempt to <BrokerConnection node_id=0 host=INCT-Shailendras:9092 [IPv4 ('127.0.1.1', 9092)]> returned error 111. Disconnecting.
WARNING:kafka.client:Node 0 connection failed -- refreshing metadata

@mddubey
Copy link

mddubey commented Jul 19, 2018

Hello,

We are using the version 1.4.2 and facing the similar issue. This is how our last logs look like and there is no log after this:-

Marking the coordinator dead (node 0) for group GROUP_NAME: Heartbeat session expired.
Fetch to node 0 failed: Cancelled: <BrokerConnection node_id=0 host=X.X.X.X:port <disconnected> [IPv4 ('X.X.X.X', port)
Error sending HeartbeatRequest_v1 to node 0 [Cancelled: <BrokerConnection node_id=0 host=X.X.X.X:port <disconnected> [IPv4 ('X.X.X.X', port)]>]

It fails for the first node in the list but our assuption was even if one node fails it should continue with the next node specified in the list.

@progovoy
Copy link

progovoy commented Sep 2, 2019

Hi,
We also have this same issue in 1.4.6. Any fixes planned?
@oded-zahavi

@piperck
Copy link

piperck commented Oct 15, 2019

Same as @progovoy in 1.4.6.
But its sporadic.
@dpkp

@murdercdh
Copy link

murdercdh commented Jun 16, 2020

In my python code I have:

group = mytest
 try:
       consumer = KafkaConsumer(bootstrap_servers=kafka_multi_hosts, auto_offset_reset='latest', enable_auto_commit=False, group_id=self.topics['customer']['group'], reconnect_backoff_ms=1,consumer_timeout_ms=1000)
       break
 except Exception, err:

while True:
   try:
     for msg in consumer:
        #do the work
     except Exception, err:
        #manage exception ! not possible in 1.3.5

My consumer is trying to read all messages within group = mytest. My kafka cluster is formed by 3 zk and 3 kafka nodes.

kubectl get pods -n kafka
NAME      READY     STATUS    RESTARTS   AGE
kafka-0   1/1       Running   3          7m
kafka-1   1/1       Running   0          4m
kafka-2   1/1       Running   0          3m
zk-0      1/1       Running   0          49m
zk-1      1/1       Running   0          7m
zk-2      1/1       Running   0          1h

I have the same issue when I using the similar code. My sentry log alert me every day~~~.
image

@sanvir10
Copy link

sanvir10 commented Sep 23, 2020

I have the same problem any update??

BrokerConnection node_id=0 host=kafka-cluster-kafka-0.kafka-cluster-kafka-brokers.default.svc:9092 <connected> [IPv4 ('10.12.2.6', 9092)]> Request 1184961: FetchRequest_v2(replica_id=-1, max_wait_time=500, min_bytes=1, topics=[(topic='test', partitions=[(partition=0, offset=438, max_bytes=1048576)])])

@vadikko2
Copy link

@murdercdh @priscofarina

Hi! Did you solve the problem?
Could you share your experience?

I have the same problem withkafka-python==2.0.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants