-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Infinite loop when consumer doesn't have leaders for all partitions #1204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Thanks for the report. There's no easy solutions here, as the general assumption is that a partition without a leader indicates a cluster in the middle of failover, which typically is a fairly transient thing, so retries make sense. I would not immediately throw an error because that would require every service to have to be aware enough of Kafka internals to know to handle this error, which sounds error prone and tedious. I also would prefer to keep the default timeout at infinite, because if I have a partition that has a few hours of downtime, I don't want to also have to worry about tracking down all consumers to restart them once that partition is fixed. That said, I do see how in specific situations it'd be nice to be able to override the infinite timeout. I'm curious how the Java consumer handles this case? PS: I edited your question slightly to update the formatting w/o changing the message contents. |
Thanks for the response! Sorry for the delay in responding. This got away from me slightly. So, my main issue with the current behaviour is that it's not possible to get any information from a consumer when it's blocked like this. As you said, it could take several hours for a cluster during a failover to recover, and meanwhile the consumer is blocked in an infinite loop, and so can't output metrics or logs explaining why it's not processing anything. Changing the current behaviour seems like a bad idea, but maybe a decent compromise is an optional timeout if needed? I'll check and see what the Java library behaviour is, but it may take me a few days to try that out with my current commitments. BTW, I updated the links in my initial message to refer to specific commits. |
I believe in the Java client it exposes timeout, and raises if timeout passes https://github.com/apache/kafka/blob/d0e7c6b9304b23ced046934c799df0cba39c28e5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L430 |
I believe this is a dup of #686 |
closing as duplicate |
If a partition doesn't have a leader, a consumer will block and loop indefinitely on creation. Ideally this would throw an error so the service can handle that case and do something sensible.
The problem is that
Fetcher._reset_offsets
is called byFetcher._reset_offset
without a timeout:kafka-python/kafka/consumer/fetcher.py
Line 230 in b1ae45c
kafka-python/kafka/consumer/fetcher.py
Line 263 in b1ae45c
The text was updated successfully, but these errors were encountered: