Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

default partition offset = 0 breaks when first log segment is rolled #175

Closed
dpkp opened this issue May 28, 2014 · 6 comments
Closed

default partition offset = 0 breaks when first log segment is rolled #175

dpkp opened this issue May 28, 2014 · 6 comments
Labels

Comments

@dpkp
Copy link
Owner

dpkp commented May 28, 2014

When not using auto-commit, the consumer object will set the default offset to 0 for all partitions:
https://github.com/mumrah/kafka-python/blob/master/kafka/consumer.py#L119

This is an absolute offset and will only work so long as the first message is still in an active log segment. As soon as it gets rolled by the server, the default consumer will no longer be able to fetch messages without a manual seek.

I think we should probably query the server for the smallest valid offset rather than assuming offset 0 is still there.

@wizzat
Copy link
Collaborator

wizzat commented May 29, 2014

I wonder how you'd go about writing a test for that. A unit test may be as good as we get on that one. But yeah, I totally agree. Arguably, the right answer is actually to start reading from the tail of the topic instead of the head.

@leporid
Copy link

leporid commented Jun 27, 2014

I think I just ran into this, in a bad way. I have a log.retention.hours=168, and send_offset_request() reports back [OffsetResponse(topic='events', partition=0, error=0, offsets=(774096,))].

Of course, I'm getting BrokerResponseError on the first fetch of a MultiProcessConsumer restart after an unrelated exception -- and our system needs to be live -- so I'm going to try to debug this.

@dpkp
Copy link
Owner Author

dpkp commented Jun 27, 2014

leporid -- workaround is to use consumer.seek(0,0) to jump to the first available offset after initializing your consumer. or seek(0,2) to jump to the last available offset.

@leporid
Copy link

leporid commented Jun 28, 2014

Thanks for the hint.

I considered that, but, sensibly, seek() is only a method on SimpleConsumer, not MultiProcessConsumer.

My solution was this: At line 118 in consumer.py, in Consumer.__init__():

 for partition in partitions:
     req = OffsetFetchRequest(topic, partition)
     (commit_offset,) = self.client.send_offset_fetch_request(group, [req],
             callback=get_or_init_offset_callback,
             fail_on_error=False
        )
    (earliest_offset,) = self.client.send_offset_request([OffsetRequest(topic, partition, -2, 1)])[0].offsets
    self.offsets[partition] = max(commit_offset, earliest_offset)

Does this make sense?

@dpkp dpkp added the consumer label Aug 22, 2014
@leporid
Copy link

leporid commented Oct 8, 2014

My private monkey patch disappeared when I upgraded to 0.9.2, and this is still open. May I attempt to fix this and generate a PR? (Solution quite similar to above, but also noticed that auto_commit == False flag also defaults partition offsets to 0, so I can fix that too.)

@dpkp
Copy link
Owner Author

dpkp commented Apr 6, 2015

Fixed in #296 by reseting partition offsets on offset out of range errors

@dpkp dpkp closed this as completed Apr 6, 2015
wbarnha added a commit to mkromer-tc/kafka-python that referenced this issue Mar 28, 2024
bradenneal1 pushed a commit to bradenneal1/kafka-python that referenced this issue May 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants