Skip to content

Unprocessed records due to wrong offset in on_assign callback in consumer.subscribe() #1329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
1 of 7 tasks
jindalshivam09 opened this issue Apr 25, 2022 · 7 comments
Closed
1 of 7 tasks
Labels

Comments

@jindalshivam09
Copy link

Description

TopicPartitions list I am receiving after consumer rebalancing (in on_assign callback) has offset set to -1001 for each partition.

Timeline:

10:22:05 partition x got revoked
10:22:15 some messages were produced at partition x
10:22:17 previous on-going commit failed due to rebalancing (KafkaError.REBALANCE_IN_PROGRESS)
10:22:17 partition x got re-assigned to a consumer

That's it. The new consumer never read the messages produced during re-balancing.

Notable config:

'auto.offset.reset': 'latest'
'auto.commit.enable': False

'confluent_kafka.version()': cp-kafka:5.0.0-1
OS: linux

I checked the logs and saw that on_assign callback contains list of topic partitions with offset set to -1001 and since this is out of bound offset, my guess is offset is falling back to 'latest', hence missing some of the messages.

How to reproduce

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

-1001 is the unset offset, e.g., no value.
Since you only get a list of partitions to your rebalance callback this is expected, and if you pass partitions with offset -1001 to assign() it will first try to get committed offsets for the partition and if that fails resort to auto.offset.reset. You may also change the offset to a logical offset (BEGINNING, END) or an absolute offset (>= 0).

If you have auto-commit disabled, and fail to commit on rebalance (because the partition is no longer owned), then there will be no committed offset to resume from, so it will employ auto.offset.reset which you've set to latest, thus skipping messages.

Do note though that you are setting auto.commit.enable, which is a legacy property that should not be used (you should see a warning on startup), but you should instead set enable.auto.commit.

A recommended approach to control what is being committed is to set enable.auto.offset.store=false and leave enable.auto.commit=true(default) as is, and then call store_offset() after processing a message.
This ensures that only processed messages will be committed, but the actual committing of offsets to the broker will be taken care of automatically.

@jindalshivam09
Copy link
Author

jindalshivam09 commented Apr 26, 2022

If you have auto-commit disabled...

Even if one commit fail, there might be earlier successful commits by other consumers, shouldn't it use that? Or there might no earlier committed offset in this case (which is highly unlikely though)?

Also we don't want to set the offset to -2 in assign() (as mentioned here) as it will lead to re-processing of all the messages.

Do note though that you are setting auto.commit.enable...

my bad, we are actually setting both.

A recommended approach to control...

What's the added benefit here?

@jindalshivam09
Copy link
Author

@edenhill ping..

@edenhill
Copy link
Contributor

edenhill commented May 3, 2022

Even if one commit fail, there might be earlier successful commits by other consumers, shouldn't it use that? Or there might no earlier committed offset in this case (which is highly unlikely though)?

It will use a previously committed offset first, if available.

What's the added benefit here?

At least once delivery. Fine-grained offset commit control.

@jindalshivam09
Copy link
Author

Thanks @edenhill for answering the questions.

@jindalshivam09
Copy link
Author

I have noticed that all of TopicPartition list we are receiving as part of consumer reassignment in on_assign event has offsets set to -1001. For instance, TopicPartition{topic=<topic_name>,partition=2,offset=-1001,error=None

It is happening regardless of whether there is a previously committed offset present or not. I tried to look in server.log on the broker side but didn't find anything other than

[2022-05-13 22:12:11,255] INFO [GroupCoordinator 1]: Preparing to rebalance group <consumer_id> with old generation 0 (__consumer_offsets-32) (kafka.coordinator.group.GroupCoordinator)
[2022-05-13 22:12:14,256] INFO [GroupCoordinator 1]: Stabilized group <consumer_id> generation 1 (__consumer_offsets-32) (kafka.coordinator.group.GroupCoordinator)
[2022-05-13 22:12:24,256] INFO [GroupCoordinator 1]: Member <id> in group <consumer_id> has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2022-05-13 22:12:24,256] INFO [GroupCoordinator 1]: Preparing to rebalance group <consumer_id> with old generation 1 (__consumer_offsets-32) (kafka.coordinator.group.GroupCoordinator)
[2022-05-13 22:12:24,256] INFO [GroupCoordinator 1]: Group <consumer_id> with generation 2 is now empty (__consumer_offsets-32) (kafka.coordinator.group.GroupCoordinator)
[2022-05-13 22:15:09,399] INFO [GroupMetadataManager brokerId=1] Group <consumer_id> transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)

We are also seeing continuous rebalancing (assignments and revokes)

@sjindal-moveworks
Copy link

sjindal-moveworks commented May 16, 2022

Since you only get a list of partitions to your rebalance callback this is expected,

Oh this explains -1001 behavior.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants