Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

can't calculate offset lag if consuming with KafkaConsumer #509

Closed
vershininm opened this issue Jan 14, 2016 · 14 comments
Closed

can't calculate offset lag if consuming with KafkaConsumer #509

vershininm opened this issue Jan 14, 2016 · 14 comments

Comments

@vershininm
Copy link

Hi, faced with strange issue trying to calculate consumer offset lag. This is on kafka 0.9.0.0.

How to reproduce:
Creating topic like this:

/opt/kafka/bin/kafka-topics.sh --create --topic topic_a  --partitions 5 --replication-factor 3 --zookeeper localhost

Then produce some messages:

from kafka import SimpleClient, SimpleProducer
client = SimpleClient('localhost:9092,localhost:9093,localhost:9094')
producer = SimpleProducer(client=client, async=False)
for i in xrange(10):
    producer.send_messages(TOPIC, str(i))

On other side half messages were consumed with:

consumer = KafkaConsumer(TOPIC, bootstrap_servers='localhost:9092,localhost:9093,localhost:9094', group_id=GROUP)
print([next(consumer).value for _ in range(5)])
consumer.commit()

Now we see correct total lag - 5:

/opt/kafka/bin/kafka-consumer-offset-checker.sh --group group_a --topic  topic_a --zookeeper localhost

Group           Topic                          Pid Offset          logSize         Lag             Owner
group_a         topic_a                               0   2               2               0               none
group_a         topic_a                               1   0               2               2               none
group_a         topic_a                               2   2               2               0               none
group_a         topic_a                               3   1               2               1               none
group_a         topic_a                               4   0               2               2               none

i'm trying to get same value with following code:

client = SimpleClient('localhost:9092,localhost:9093,localhost:9094')
client.load_metadata_for_topics()

partitions = client.topic_partitions[TOPIC]
offset_requests = [OffsetRequestPayload(TOPIC, p, -1, 1) for p in partitions.keys()]

latest_offset_by_partition = {r.partition: r.offsets[0]
                              for r in client.send_offset_request(offset_requests)}
current_offset_by_partition = {r.partition: r.offset
                               for r in client.send_offset_fetch_request(GROUP, offset_requests)}
lag = 0
for part in partitions.keys():
    current = current_offset_by_partition.get(part, -1)
    latest = latest_offset_by_partition.get(part)
    lag += latest - current

print('lag: {}'.format(lag))

but getting UnknownTopicOrPartitionError: UnknownTopicOrPartitionError - 3 - This request is for a topic or partition that does not exist on this broker.

if i do same: create, produce, but consume with:

client = SimpleClient('localhost:9092,localhost:9093,localhost:9094')
consumer = SimpleConsumer(client, group=GROUP, topic=TOPIC)
consumer.get_messages(5)
consumer.commit()

i'm getting correct lag with my code, and it is same as see with kafka-consumer-offset-checker

A bit confusing issue, probably i'm doing something wrong, so let me know if it is.

@saluker
Copy link

saluker commented Jan 25, 2016

I was able to reproduce this with one modification: I had to try to consume with the group_id before producing. Other than that I get the same issue: bin/kafka-consumer-offset-checker.sh works when consuming with either SimpleConsumer or KafkaConsumer. But the in-code method (per @dribler code) only works with SimpleConsumer. Anything else I can try to help debug this?

@dpkp
Copy link
Owner

dpkp commented Jan 25, 2016

Couple thoughts -- first, the send_offset_fetch_request call uses the v0 zookeeper-storage apis. If you are intending to use kafka-storage (v1), you can use send_offset_fetch_request_from_kafka

I believe the broker throws UnknownTopicOrPartitionError on a v0 request if the group is unrecognized. Your code above works for me when the group already exists and has offsets committed. It raises an error if I try an unknown group.

Also, please note that the 'Simple' interfaces -- Client / Consumer / Producer -- are being deprecated. I'll put some thought into where lag monitoring fits. I believe the java client includes it as part of the metrics system?

@dpkp
Copy link
Owner

dpkp commented Jan 25, 2016

Re new apis: I expect we'll follow KAFKA-2076 when it is finalized.

@saluker
Copy link

saluker commented Jan 26, 2016

So is there any way to do lag monitoring now with v1 storage?

I get exception when calling send_offset_fetch_request_kafka() from SimpleClient:

TypeError: encode_consumer_metadata_request() takes exactly 4 arguments (2 given)

@saluker
Copy link

saluker commented Jan 30, 2016

@dpkp I realize lag monitoring will have to wait until KAFKA-2076. But is there any way to get lag with v1 storage in the new client?

We really want to upgrade to 0.9 and the new client but not being able to monitor log is the final hurdle.

@dpkp
Copy link
Owner

dpkp commented Jan 31, 2016

at the lowest level you would do this:

In [1]: from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest_v1

In [2]: from kafka.client_async import KafkaClient

In [3]: cli = KafkaClient(bootstrap_servers=['localhost:57138'])

In [4]: while not cli.ready(0):
   ....:     pass
   ....:

In [5]: future = cli.send(0, GroupCoordinatorRequest('kafka-python-default-group'))

In [6]: cli.poll(future=future)
Out[6]: [GroupCoordinatorResponse(error_code=0, coordinator_id=0, host=u'127.0.0.1', port=57138)]

In [7]: coordinator = future.value.coordinator_id

In [8]: future = cli.send(coordinator, OffsetFetchRequest_v1('kafka-python-default-group', [('foobar', [0, 1])]))

In [9]: cli.poll(future=future)
Out[9]: [OffsetFetchResponse(topics=[(topic=u'foobar', partitions=[(partition=0, offset=46, metadata=u'', error_code=0), (partition=1, offset=54, metadata=u'', error_code=0)])])]

In [10]: future.value
Out[10]: OffsetFetchResponse(topics=[(topic=u'foobar', partitions=[(partition=0, offset=46, metadata=u'', error_code=0), (partition=1, offset=54, metadata=u'', error_code=0)])])

That's a little messy -- you have to manage keeping track of coordinator, dealing with api error codes, and parsing partition info. And the protocol classes aren't fully documented or even finalized (they should be easier to use).

In any event, I'll try to get something added to KafkaConsumer soon.

@dpkp
Copy link
Owner

dpkp commented Feb 1, 2016

I've added highwater() method to KafkaConsumer. From a consumer instance you can calculate a lag value for any partition by ( highwater - position ).

You will not be able to use this approach out of process because it depends on the consumer actually fetching messages. But it may be useful as a start.

@jeffwidman
Copy link
Collaborator

jeffwidman commented Nov 12, 2016

You will not be able to use this approach out of process because it depends on the consumer actually fetching messages. But it may be useful as a start.

Is there any way to use this to fetch the lag without actually fetching the message?

Actually, looks like what I'm really looking for is this: #633 (comment)

@dtest11
Copy link

dtest11 commented Jan 4, 2017

@ saluker do you resolve the problem ?

@saluker
Copy link

saluker commented Jan 4, 2017

@FizLBQ I was waiting for KAFKA-2076 and KIP-79 and then for it to be implemented in kafka-python to retest. Looks like KAFKA-2076 and KIP-79 are done as of a few months ago. @dpkp is that in kafka-python? If so happy to help test!

@dpkp
Copy link
Owner

dpkp commented Jun 19, 2017

The way I typically calculate lag is with the highwater() method of KafkaConsumer:

for msg in consumer:
    tp = TopicPartition(msg.topic, msg.partition)
    highwater = consumer.highwater(tp)
    lag = (highwater - 1) - msg.offset

@dpkp
Copy link
Owner

dpkp commented Jun 19, 2017

See #1036 re: KIP-79 support, which would allow accessing the highwater offset for a partition without having the partition assigned / during consumer loop.

@dpkp dpkp closed this as completed Jun 19, 2017
@jeffwidman
Copy link
Collaborator

Also relevant is #950 / KIP-88

@jeffwidman
Copy link
Collaborator

See also #1643 which adds KafkaAdmin.list_consumer_group_offsets()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants