Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seek method returning incorrect messages on compressed topic when using max_poll_records #1214

Closed
88manpreet opened this issue Sep 13, 2017 · 5 comments
Milestone

Comments

@88manpreet
Copy link

88manpreet commented Sep 13, 2017

While using seek method of `kafka.consumer.group.seek' for a given partition, offset, we are seeing the inconsistent behavior for the messages returned with the subsequent poll method.
The issue is easily reproducible for the given topic (compacted).

Part of Workflow:

from kafka.consumer.group import KafkaConsumer

topic_partition = TopicPartition(topic, 0)
consumer = KafkaConsumer(*consumer_config)
consumer.assign([topic_partition]) 
start_offset = 100 # Example value: highwatermark - 10
consumer.seek(partition=topic_partition, offset=start_offset)
messages = consumer.poll(timeout_ms=1000, max_records=1)[topic_partition]
message = messages[0]
print('Offset found:', message.offset, 'Expected offset:', start_offset)

Sample Output:
$ Offset found:80 Expected offset:100

Observation:

  • If iterator interface is used instead of poll interface, the issue no longer exists. My guess is somewhere while polling for messages, the fetched offsets are not updated or fetched messages are not skipped. It looks like iterator method is not using fetched_records api that's why it works fine.

  • At times it does give correct messages (especially when given offset is closer to highwatermark)

Please let me know if any other details are required.

@88manpreet
Copy link
Author

88manpreet commented Sep 13, 2017

On some debugging I found that, the fetch_offset over here is the given start_offset (100) and so skips the actual offset 79 as expected in the first iteration. But in second iteration the fetch_offset (part.offset) is same as the current record.offset (80 in above case) and henceforth it is not skipped.

@dpkp
Copy link
Owner

dpkp commented Oct 2, 2017

Can you post debug logs ? Are you sure you aren't accidentally joining a consumer group and fetching the last-commit offsets?

@88manpreet
Copy link
Author

@dpkp yes we are sure. I will try to get the relevant debug logs for the above scenario. However, the fix as part of #1222 seems to work good so far for us. Please refer my comment on that issue.

@dpkp dpkp changed the title Seek method returning incorrect messages Seek method returning incorrect messages on compressed topic Oct 5, 2017
@dpkp dpkp changed the title Seek method returning incorrect messages on compressed topic Seek method returning incorrect messages on compressed topic when using max_poll_records Oct 5, 2017
@dpkp
Copy link
Owner

dpkp commented Oct 5, 2017

Got it -- thanks. The issue here is that compressed messages (at least for v0 and v1 format) are actually returned as single message "batches" in the MessageSet format. The batch offset is equal to the last message in the compressed batch. When seeking to an offset that is in a compressed batch we first register the outer batch message offset, which is the offset of the last message inside the batch. When we take uncompressed messages from inside the batch, the individual messages have smaller offsets than the outer batch offset. The _append logic in fetcher.py assumes that each message has a larger offset than the messages that came before it. We should fix this and document this difference in offset behavior for compressed messages.

@dpkp
Copy link
Owner

dpkp commented Nov 17, 2017

#1239 caused a regression documented in #1290 so I've reverted it until I can find a better fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants