Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaConsumer using group_id with manually assigned partitions can raise unexpected IllegalStateError #1112

Closed
dcrosta opened this issue May 30, 2017 · 6 comments

Comments

@dcrosta
Copy link

dcrosta commented May 30, 2017

Kafka 0.10.0.1
Kafka-Python 1.3.3

I create a consumer like:

consumer = KafkaConsumer(
    group_id="the-group",
    bootstrap_servers=[...],
    consumer_timeout_ms=30000,
)

partitions = [
    TopicPartition("the-topic", partition_num)
    for partition_num in consumer.partitions_for_topic(settings.kafka.topic)
    if partition_num % 2 == 0
]
consumer.assign(partitions)

And then consume from it using for message in consumer: in the usual way.

Some time later (a few seconds after beginning consuming in the for loop), I get:

Traceback (most recent call last):
  File "/usr/local/opt/pypy/site-packages/kafka/future.py", line 79, in _call_backs
    f(value)
  File "/usr/local/opt/pypy/lib_pypy/_functools.py", line 45, in __call__
    return self._func(*(self._args + fargs), **fkeywords)
  File "/usr/local/opt/pypy/site-packages/kafka/coordinator/consumer.py", line 547, in _handle_offset_commit_response
    self._subscription.mark_for_reassignment()
  File "/usr/local/opt/pypy/site-packages/kafka/consumer/subscription_state.py", line 172, in mark_for_reassignment
    raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
IllegalStateError: IllegalStateError: You must choose only one way to configure your consumer: (1) subscribe to specific topics by name, (2) subscribe to topics matching a regex pattern, (3) assign itself specific topic-partitions.

I don't call subscribe() anywhere, nor assign() anywhere else or again. This looks like it's coming from _handle_offset_commit_response asynchronously, but I don't know why it would be.

Mostly I want to understand if I am supposed to do anything with this error, and, given that it's happening asynchronously, if I even can do anything with it (will it bubble up in a try: block anywhere? I think not, if it's on another thread, for instance)

@dcrosta
Copy link
Author

dcrosta commented May 30, 2017

I should be clear, I'm seeing that in the output of a logging.error call:

2017-05-30 19:17:00,136 [kafka.future:ERROR] Error processing callback
Traceback (most recent call last):
...

I've just restarted the process again, and did not observe this error logged again. From a glance at the code in https://github.com/dpkp/kafka-python/blob/master/kafka/coordinator/consumer.py#L540-L559, it looks like this error is being 'handled' by saying, "it happened, not much we can do about it", and moving on with our lives. Presumably future offset commits will happen at their regularly scheduled intervals.

@dpkp
Copy link
Owner

dpkp commented May 30, 2017

This looks like a bug caused by using a group_id while assigning partitions manually. Our test suite doesn't cover that use case particularly well yet. I suspect there are a few edge cases here and there that need to get handled in this scenario. Nonetheless, it should be fine to move on with your lives in this case :)

@dcrosta
Copy link
Author

dcrosta commented May 31, 2017

OK thanks -- let me know if I can help here at all.

@dpkp dpkp changed the title Unexpected IllegalStateError KafkaConsumer using group_id with manually assigned partitions can raise unexpected IllegalStateError Jun 17, 2017
@dpkp
Copy link
Owner

dpkp commented Oct 22, 2017

I believe this is fixed by #1266

@ryanjmccall
Copy link

ryanjmccall commented Dec 12, 2017

I have a similar issue where I have some processes consuming a topic in subscribe mode. Then I stop those processes (invoking close()). Shortly afterwards, if I try to advance the offsets (in a separate script) on the same topic I get the IllegalStateError asynchronously and the process hangs. I am basically doing the following:

    consumer.assign(topicPartitions)
    consumer.seek_to_end(*topicPartitions)
    for tp in topicPartitions:
        consumer.commit({tp: OffsetAndMetadata(consumer.position(tp), None)})

The issue disappears after several minutes.

@dpkp
Copy link
Owner

dpkp commented Dec 29, 2019

Fixed in #1364

@dpkp dpkp closed this as completed Dec 29, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants