Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer: auto.offset.store commits incorrect offsets #4341

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

iblislin
Copy link

@iblislin iblislin commented Jul 4, 2023

Sponsored-by: China Medical University Hospital

--
Hi,

I guess this issue is introduced by #4208, but I do not understand the details in that issue.

I inspected the partition offset via on_commit by the Python library.
W/o this patch, there are two case tried and I found the offsets committed are different:

case 1.

The consumer is configured with

c = Consumer({
    'bootstrap.servers': os.environ.get('BOOTSTRAP_SERVERS'),
    'group.id': 'magic',
    'auto.offset.reset': 'earliest',
    'enable.auto.offset.store': True,
    'enable.auto.commit': True,
    'on_commit': on_commit,
})

where the on_commit function is:

from confluent_kafka import  TopicPartition
                                          
def on_commit(err, partitions: list[TopicPartition]):
    for i in sorted(partitions, key=lambda x: x.partition):
        print(f'{i.partition}\t-> {i.offset}')

case 2

c = Consumer({
    'bootstrap.servers': os.environ.get('BOOTSTRAP_SERVERS'),
    'group.id': 'magic2',
    'auto.offset.reset': 'earliest',
    'enable.auto.offset.store': False,
    'enable.auto.commit': True,
    'on_commit': on_commit,
})

and then I invoked the c.store_offset(message=msg) manually, after processed a message.

results

I run case 1 and case 2 twice. The first run is for reading the topic to the end and waiting for the offsets committed.
The expected result in the second run is that the consumers should not get any messages, since there aren't any producers running at this point.

The case 2 works correctly in the second run.
The case 1 starts consuming the log from some unexpected point of offset in the second run.

This patch fixed case 1. But I'm not sure of the side effect of it.

Sponsored-by: China Medical University Hospital
@cla-assistant
Copy link

cla-assistant bot commented Aug 21, 2023

CLA assistant check
All committers have signed the CLA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant