Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting lag from topics (KafkaAdmin and KafkaConsumer) #1673

Closed
PabloInnocenti opened this issue Dec 10, 2018 · 3 comments
Closed

Getting lag from topics (KafkaAdmin and KafkaConsumer) #1673

PabloInnocenti opened this issue Dec 10, 2018 · 3 comments

Comments

@PabloInnocenti
Copy link

PabloInnocenti commented Dec 10, 2018

Dear kafka-python team,

Im trying to get lag from topics. I read and try the following issues:

#509
#1643

based on comment:

for msg in consumer:
    tp = TopicPartition(msg.topic, msg.partition)
    highwater = consumer.highwater(tp)
    lag = (highwater - 1) - msg.offset

I tried the following code (based an some others post from stackoverflow and issues)

self.consumer=KafkaConsumer(
            bootstrap_servers=self.bootstrap_servers,
            group_id='test',
            enable_auto_commit=False,
            consumer_timeout_ms=5000
        )

topics = self.consumer.topics()
for topic in topics:
  self.logger.info("Getting Metrics for topic {topic}".format(topic=topic))
  lag_in_seconds = 0.0
  offset = 0
  partitions = self.consumer.partitions_for_topic(topic)
  if partitions is not None:
    self.logger.info("Partitions {}".format(str(len(partitions))))
    for p in partitions:
      tp = TopicPartition(topic, p)
      self.consumer.assign([tp])
      committed = self.consumer.committed(tp)
      self.consumer.seek_to_end(tp)
      last_offset = self.consumer.position(tp)
      offset = offset + last_offset
      highwater = self.consumer.highwater(tp)
      if highwater is None:
        lag = None
      else:
        lag = (highwater - 1) - last_offset

I get always highwater None.

What would be the correct way to get lag?

Thanks,
Pablo.

@jeffwidman
Copy link
Collaborator

jeffwidman commented Dec 12, 2018

Are you trying to get lag inline as you consume, or from a sidechannel?

If the latter, I just put up some code for doing that based on the KafkaAdminClient here: DataDog/integrations-core#2730

The methods should be able to be pulled out and re-used pretty easily for any monitoring system, not just DataDog.

And I do recommend fetching this data from a side channel, because if you fetch from the Consumer and for some reason it can't talk to the cluster or is deadlocked, it may not realize it's falling behind.

@PabloInnocenti
Copy link
Author

Thanks Jeff! I will wait next releases with this new feature (KafkaAdminClient).

@KeithTt
Copy link

KeithTt commented Mar 25, 2021

How to use KafkaAdminClient compute the consumer lag now???

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants