Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer stuck in infinite loop on connection error #1306

Closed
zackdever opened this issue Nov 21, 2017 · 24 comments
Closed

KafkaConsumer stuck in infinite loop on connection error #1306

zackdever opened this issue Nov 21, 2017 · 24 comments

Comments

@zackdever
Copy link
Collaborator

It seems to be stuck in this loop

error = 'Unable to connect to any of the names for {0}:{1}'.format(

The consumer filled up ~1TB logs over the course of 3 days, but did not throw an exception. Example logs:

kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.conn   ERROR    Unable to connect to any of the names for kafka-4-broker.example.com:9092
kafka.conn   WARNING  <BrokerConnection node_id=104 host=kafka-4-broker.example.com/kafka-4-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-4-broker.example.com:9092
kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.conn   ERROR    Unable to connect to any of the names for kafka-1-broker.example.com:9092
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.conn   ERROR    Unable to connect to any of the names for kafka-2-broker.example.com:9092
kafka.conn   WARNING  <BrokerConnection node_id=104 host=kafka-4-broker.example.com/kafka-4-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-4-broker.example.com:9092
kafka.cluster INFO     Group coordinator for my-group is BrokerMetadata(nodeId=102, host=u'kafka-2-broker.example.com', port=9092, rack=None)
kafka.conn   WARNING  <BrokerConnection node_id=101 host=kafka-1-broker.example.com/kafka-1-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-1-broker.example.com:9092
kafka.conn   ERROR    Unable to connect to any of the names for kafka-2-broker.example.com:9092
kafka.conn   ERROR    Unable to connect to any of the names for kafka-2-broker.example.com:9092
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.conn   ERROR    Unable to connect to any of the names for kafka-3-broker.example.com:9092
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.conn   WARNING  <BrokerConnection node_id=102 host=kafka-2-broker.example.com/kafka-2-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-2-broker.example.com:9092
kafka.conn   WARNING  <BrokerConnection node_id=103 host=kafka-3-broker.example.com/kafka-3-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-3-broker.example.com:9092
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.coordinator INFO     Discovered coordinator 102 for group my-group
kafka.conn   WARNING  <BrokerConnection node_id=102 host=kafka-2-broker.example.com/kafka-2-broker.example.com port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-2-broker.example.com:9092
@jeffwidman
Copy link
Contributor

Are you running master or 1.3.5 or another version?

Also I doubt it matters, but just in case, what broker version?

@zackdever
Copy link
Collaborator Author

We're on 1.3.5 and the broker is 0.10.1.1.

@aptiko
Copy link

aptiko commented Nov 24, 2017

Apparently I'm having the same problem. It usually (but not always) appears if Kafka goes down (therefore disconnecting clients) and then comes up again. Sometimes clients will reconnect alright, sometimes they will be stuck in this loop. Here's a log extract with a traceback:

2017-11-24 13:31:11,192 - ERROR - conn.connect(289) Unable to connect to any of the names for kafka:9092
2017-11-24 13:31:11,192 - WARNING - conn.close(623) <BrokerConnection node_id=0 host=kafka/kafka port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka:9092                                               
Stack (most recent call last):                                                                          
  File "/usr/lib/python3.5/threading.py", line 882, in _bootstrap                                                                                     
    self._bootstrap_inner()                                   
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner               
    self.run()                                                                                               
  File "/tmp/virtualenv/lib/python3.5/site-packages/kafka/producer/sender.py", line 60, in run          
    self.run_once()                                                    
  File "/tmp/virtualenv/lib/python3.5/site-packages/kafka/producer/sender.py", line 108, in run_once
    if not self._client.ready(node):                                                          
  File "/tmp/virtualenv/lib/python3.5/site-packages/kafka/client_async.py", line 385, in ready
    self._maybe_connect(node_id)                                                                    
  File "/tmp/virtualenv/lib/python3.5/site-packages/kafka/client_async.py", line 371, in _maybe_connect
    conn.connect()                                                                           
  File "/tmp/virtualenv/lib/python3.5/site-packages/kafka/conn.py", line 290, in connect
    self.close(Errors.ConnectionError(error))
  File "/tmp/virtualenv/lib/python3.5/site-packages/kafka/conn.py", line 623, in close
    log.warning('%s: close() called on disconnected connection with error: %s', self, error, stack_info=True)
2017-11-24 13:31:11,193 - ERROR - conn.connect(289) Unable to connect to any of the names for kafka:9092
2017-11-24 13:31:11,193 - WARNING - conn.close(623) <BrokerConnection node_id=0 host=kafka/kafka port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka:9092

This is 1.3.5 on Python 3.5.1 on Ubuntu 16.04 (on a Docker image, with Kafka on another Docker image).

@aptiko
Copy link

aptiko commented Nov 24, 2017

It also happens on 1.3.4.1, but not on 1.3.3.

With 1.3.3, again behavior doesn't seem to be deterministic. Sometimes it reconnects, sometimes it raises socket.gaierror (likewise, 1.3.5 sometimes reconnects, sometimes it goes into the endless loop).

@jeffwidman
Copy link
Contributor

jeffwidman commented Dec 2, 2017

I hit this earlier today on one of our development environments with a KafkaProducer.

The Kafka cluster had completely crashed, then after the brokers were restarted, the producer never recovered. It just continually spouted these logs:

2017-12-01 21:40:49,873 ERROR           conn                    connect:289     4983    139743082212240 Unable to connect to any of the names for kafka03.anon.local:9092
2017-12-01 21:40:49,873 WARNING         conn                    close:625       4983    139743082212240 <BrokerConnection node_id=3 host=kafka03.anon.local/kafka03.anon.lo
cal port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka03.anon.local:9092
2017-12-01 21:40:49,875 ERROR           conn                    connect:289     4983    139743082212240 Unable to connect to any of the names for kafka03.anon.local:9092
2017-12-01 21:40:49,875 WARNING         conn                    close:625       4983    139743082212240 <BrokerConnection node_id=3 host=kafka03.anon.local/kafka03.anon.lo
cal port=9092>: close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka03.anon.local:9092

Given that it's a BrokerConnection class error, not really a surprise it's showing up with both KafkaConsumer and KafkaProducer.

I watched the tcp stream and verified that nothing was being sent over port 9092, so it's not even trying to talk to Kafka.

Once I restarted the producer it worked fine.

@jeffwidman
Copy link
Contributor

@zackdever do you know if the broker had gone away when this was triggered?

@zackdever
Copy link
Collaborator Author

Ah I wish I knew for sure or had the logs to check, but I can't confirm. I think a broker had gone down when this happened.

@dpkp
Copy link
Owner

dpkp commented Dec 6, 2017

I believe the error here is caused by cached DNS results that are not cleared on failure. My hunch is that this is triggered by dynamic DNS entries causing a failure at the broker level to cause a DNS failure. I put up a PR w/ unit test that should fix (if my diagnosis is correct).

@jeffwidman
Copy link
Contributor

That may be true. I suspect the reason Kafka crashed in our case because the underlying VM was shutdown and moved, so the IP address would have changed.

@aptiko
Copy link

aptiko commented Dec 7, 2017

This might explain what I'm having. I'm actually using a group of Docker containers for an integration test, and I'm testing resilience by shutting down the Kafka container and bringing it up again. I'm not certain what Docker does in this case, but if sometimes it gives the restarted container the same (internal) ip address and sometimes a different one, this could be the reason why sometimes I get different behavior than other times.

@jeffwidman
Copy link
Contributor

Thanks @aptiko. Also, it sounds like you're the only person here with a reproducible setup.

  1. Do you mind pinning the IP of the Kafka containers in your Docker compose setup to see if this goes away? I'd just like to see if there's any other issue in addition to DNS that might be triggering this.
  2. Would you be willing to open-source some of your test setup? It'd save time while cobbling things together for Figure out a way to test against constantly cycling Kafka cluster #1311. Even if it's very hacky, just stick it in a gist or something, and add a link at Figure out a way to test against constantly cycling Kafka cluster #1311

@dpkp
Copy link
Owner

dpkp commented Dec 7, 2017

I actually think it is pretty easy to reproduce at a unit test level. The log entry that gives it away is Unable to connect to any of the names . To reproduce in a repl, do this:

import socket
from kafka import BrokerConnection
import logging; logging.basicConfig(level='INFO')
conn = BrokerConnection(some_valid_host, some_valid_port, socket.AF_UNSPEC)
conn.connect() # repeat until connected... should work normally
conn.close()
conn._gai = [] # this forces an empty dns cache
conn.connect() # this will fail and never recover

@jeffwidman
Copy link
Contributor

Thanks Dana, that makes perfect sense.

@aptiko I'd still be interested in seeing your test setup for #1311.

@aptiko
Copy link

aptiko commented Dec 8, 2017

I'm creating a minimal setup that reproduces the problem and I think the problem occurs when the Kafka container is stopped, not when it is restarted. I'll let you know more when it's ready.

@aptiko
Copy link

aptiko commented Dec 8, 2017

It is as I said; the problem occurs when the Kafka container is stopped. Here's the demo: https://github.com/aptiko/kafka-python-1306-demo

Easy to run if you have docker and docker-compose experience, harder if you don't. I may be a bit slow to respond to requests but eventually I will.

@dpkp dpkp closed this as completed in #1312 Dec 8, 2017
@priscofarina
Copy link

priscofarina commented Jan 22, 2018

Sorry guys but I didn't get the solution, I mean:

In my python code I am using KafkaProducer & KafkaConsumer, when kafka goes down I have never the control in my python code to "manage" exceptions in some way and trigger some internal logic.

When I was using the version of kafka-python 1.3.3 I was able to do what I described so far, now it seems I haven't the control.

let's assume I am consuming messages in this way:

 try:
                    consumer = KafkaConsumer(bootstrap_servers=kafka_multi_hosts, auto_offset_reset='latest', enable_auto_commit=False, group_id=self.topics['customer']['group'], reconnect_backoff_ms=1,consumer_timeout_ms=1000)
                    break
                except Exception, err:

while True:
            try:
                for msg in consumer:
                   #do the work
            except Exception, err:
                 #manage exception ! not possible in 1.3.5

How I can fix the problem?
I am in the loop of the following log messages:

port=9092>: creating new socket
[ERROR] 01/19/2018 03:14:22 PM [conn.py:connect:289] Unable to connect to any of the names for kafka-1.kafka.kafka.svc.cluster.local:9092
[ERROR] 01/19/2018 03:14:22 PM [conn.py:connect:289] Unable to connect to any of the names for kafka-1.kafka.kafka.svc.cluster.local:9092
[WARNING] 01/19/2018 03:14:22 PM [conn.py:close:625] : close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-1.kafka.kafka.svc.cluster.local:9092
[WARNING] 01/19/2018 03:14:22 PM [conn.py:close:625] : close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-1.kafka.kafka.svc.cluster.local:9092
[WARNING] 01/19/2018 03:14:22 PM [base.py:call:663] Coordinator unknown during heartbeat -- will retry
[WARNING] 01/19/2018 03:14:22 PM [base.py:_handle_heartbeat_failure:694] Heartbeat failed ([Error 15] GroupCoordinatorNotAvailableError); retrying
[DEBUG] 01/19/2018 03:14:22 PM [client_async.py:_maybe_refresh_metadata:766] Initializing connection to node 1 for metadata request
[DEBUG] 01/19/2018 03:14:22 PM [conn.py:connect:257] : creating new socket
[ERROR] 01/19/2018 03:14:22 PM [conn.py:connect:289] Unable to connect to any of the names for kafka-1.kafka.kafka.svc.cluster.local:9092
[WARNING] 01/19/2018 03:14:22 PM [conn.py:close:625] : close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-1.kafka.kafka.svc.cluster.local:9092
[DEBUG] 01/19/2018 03:14:22 PM [conn.py:connect:257] : creating new socket
[ERROR] 01/19/2018 03:14:22 PM [conn.py:connect:289] Unable to connect to any of the names for kafka-1.kafka.kafka.svc.cluster.local:9092
[WARNING] 01/19/2018 03:14:22 PM [conn.py:close:625] : close() called on disconnected connection with error: ConnectionError: Unable to connect to any of the names for kafka-1.kafka.kafka.svc.cluster.local:9092
[DEBUG] 01/19/2018 03:14:22 PM [client_async.py:_maybe_refresh_metadata:766] Initializing connection to node 1 for metadata request
[DEBUG] 01/19/2018 03:14:22 PM [conn.py:connect:257] : creating new socket

@aptiko
Copy link

aptiko commented Jan 22, 2018

@darkprisco if you are hit by this bug, use 1.3.3 (or master); there's yet no release that contains this fix.

@dpkp
Copy link
Owner

dpkp commented Jan 22, 2018

Apologies for not having this released yet. There are a few other lingering issues that I feel like need to be addressed before pushing a new release. I have added a milestone and tagged those issues. If anyone has opinions about release priorities, though, I'm all ears!!

@unixeO
Copy link

unixeO commented Sep 9, 2019

@dpkp Does the fix for this bug was release on version 1.4? I'm using version 1.4.6 and when shutting down Kafka but with a worker that calls KafkaConsumer, the infinite loop still happen.

        try:
            cls._consumer = KafkaConsumer(
                ...
            )
        except KafkaError as err:
            raise FailedToConnect(f"{err}")

And because of the infinite loop, the except KafkaError never happen.

Some logs:

kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to kafka:9092 [('127.0.0.1', 9092) IPv4]
kafka.conn - ERROR - Connect attempt to <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error 111. Disconnecting.
kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED

@dhatraknilam
Copy link

@dpkp Does the fix for this bug was release on version 1.4? I'm using version 1.4.6 and when shutting down Kafka but with a worker that calls KafkaConsumer, the infinite loop still happen.

        try:
            cls._consumer = KafkaConsumer(
                ...
            )
        except KafkaError as err:
            raise FailedToConnect(f"{err}")

And because of the infinite loop, the except KafkaError never happen.

Some logs:

kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to kafka:9092 [('127.0.0.1', 9092) IPv4]
kafka.conn - ERROR - Connect attempt to <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error 111. Disconnecting.
kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED

I have observed same issue. I am using kafka 1.4.6. Please reopen this issue.

@qiankunxienb
Copy link

same error on kafka-python==2.0.2

@Leeiiie
Copy link

Leeiiie commented Dec 2, 2020

@dpkp Does the fix for this bug was release on version 1.4? I'm using version 1.4.6 and when shutting down Kafka but with a worker that calls KafkaConsumer, the infinite loop still happen.

        try:
            cls._consumer = KafkaConsumer(
                ...
            )
        except KafkaError as err:
            raise FailedToConnect(f"{err}")

And because of the infinite loop, the except KafkaError never happen.

Some logs:

kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to kafka:9092 [('127.0.0.1', 9092) IPv4]
kafka.conn - ERROR - Connect attempt to <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error 111. Disconnecting.
kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED

I met the same question with kafka-python==2.0.1.
Is this a bug?
Looking forward to your reply, thanks@dpkp

@Leeiiie
Copy link

Leeiiie commented Dec 2, 2020

@dpkp Does the fix for this bug was release on version 1.4? I'm using version 1.4.6 and when shutting down Kafka but with a worker that calls KafkaConsumer, the infinite loop still happen.

        try:
            cls._consumer = KafkaConsumer(
                ...
            )
        except KafkaError as err:
            raise FailedToConnect(f"{err}")

And because of the infinite loop, the except KafkaError never happen.
Some logs:

kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to kafka:9092 [('127.0.0.1', 9092) IPv4]
kafka.conn - ERROR - Connect attempt to <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]> returned error 111. Disconnecting.
kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=kafka:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Closing connection. KafkaConnectionError: 111 ECONNREFUSED

I met the same question with kafka-python==2.0.1.
Is this a bug?
Looking forward to your reply, thanks@dpkp

@dpkp

@adityavikasd
Copy link

I had a similar connection error when a Python Kafka App (client) was running in docker and the Kafka broker was running on my machine and not in docker or any container, listening on localhost:9092. If you inspect the server config, you'd find this

listeners=PLAINTEXT://localhost:9092

and nothing set for advertised.listeners. In this case, since the advertised.listeners isn't set to anything, it will use the same value as listeners.

So, once a client (be it consumer/producer) connects to the broker using the right hostname and port, it happens to be host.docker.internal and 9092 in my case, the broker sends back metadata saying that you should connect to localhost:9092 to connect to all brokers, but localhost on docker is not the same as the localhost on your machine/workstation/laptop, due to how docker network abstraction works. So, using the following config for broker works

listeners=PLAINTEXT://:9092,DOCKER://:19092
advertised.listeners=PLAINTEXT://localhost:9092,DOCKER://host.docker.internal:19092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,DOCKER:PLAINTEXT

Once this config is in place and the broker is restarted, you can connect to it from any docker app using host.docker.internal:9092. And app running on your local can connect using localhost:9092 or 127.0.0.1:9092 or <YOUR_LOCAL_IP>:9092.

For better understanding, please read this excellent blog post https://www.confluent.io/blog/kafka-listeners-explained/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants