Skip to content

list_consumer_group_offsets_request support multiple consumer groups #1947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
cobolbaby opened this issue Mar 19, 2025 · 0 comments
Open

Comments

@cobolbaby
Copy link

cobolbaby commented Mar 19, 2025

Now, list_consumer_group_offsets only supports a single consumer group, and the efficiency of looping is too low.

from confluent_kafka import (Consumer, ConsumerGroupTopicPartitions,
                             KafkaException, TopicPartition)
from confluent_kafka.admin import AdminClient

def get_kafka_lag_consumer_groups():
    """
    获取 Kafka 所有 consumer group 的 lag 信息(优化为批量查询)
    """
    try:
        admin_client = AdminClient({
            'bootstrap.servers': KAFKA_CONNECT_BOOTSTRAP_SERVERS
        })
        
        consumer = Consumer({
            'bootstrap.servers': KAFKA_CONNECT_BOOTSTRAP_SERVERS,
            'group.id': 'monitoring-consumer',
            'auto.offset.reset': 'latest',
            'enable.auto.commit': False
        })

        # 获取所有 consumer group
        future_groups = admin_client.list_consumer_groups(request_timeout=10)
        groups = future_groups.result()
        
        consumer_group_ids = [ConsumerGroupTopicPartitions(g.group_id) for g in groups.valid 
                              if g.group_id.startswith("connect-")]
        if not consumer_group_ids:
            logging.warning("No consumer groups match the filter.")
            return []
        
        logging.info(f"Fetching offsets for {len(consumer_group_ids)} consumer groups...")

        high_lag_groups = []

        # TODO:由于 lib 库不支持批量查询的操作,所以只能循环处理
        for consumer_group_id in consumer_group_ids:

            # 批量请求 consumer group 的 offsets
            future_offsets = admin_client.list_consumer_group_offsets([consumer_group_id])

            group_id = consumer_group_id.group_id
            offsets = future_offsets[group_id].result()
            
            logging.debug(f"Consumer Group: {group_id}")
            summedLag = 0
            for tp in offsets.topic_partitions:
                # 获取 partition 最新 offset(high watermark)
                high_watermark = consumer.get_watermark_offsets(tp)[1]  # (low, high)
                lag = high_watermark - tp.offset  # 计算 lag
                summedLag += lag
                logging.debug(f"  Topic: {tp.topic}, Partition: {tp.partition}, Lag: {lag}")

            if summedLag > 100:
                logging.info(f"High lag consumer group found: {group_id}")
                high_lag_groups.append(group_id)
            
        return high_lag_groups
    except Exception as e:
        logging.error(f"Error fetching consumer groups: {e}")
        return []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant