diff --git a/kafka_exporter.go b/kafka_exporter.go index 6dc35a2c..888ed7ff 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -362,17 +362,21 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { clusterBrokers, prometheus.GaugeValue, float64(len(e.client.Brokers())), ) + topicPartitionsMap := make(map[string][]int32) offset := make(map[string]map[int32]int64) + // groupOffset is used for recording consumergroup offset by groupid, topic, partition on different brokers + // structure: groupOffset[broker.ID()][group.GroupId][topic][partition] = offsetFetchResponseBlock.Offset + // eg: groupOffset[1]["test_group"]["test_topic"][0] = 1 + groupOffset := make(map[int32]map[string]map[string]map[int32]int64) + // metadata refresh control now := time.Now() - if now.After(e.nextMetadataRefresh) { glog.V(DEBUG).Info("Refreshing client metadata") if err := e.client.RefreshMetadata(); err != nil { glog.Errorf("Cannot refresh topics, using cached data: %v", err) } - e.nextMetadataRefresh = now.Add(e.metadataRefreshInterval) } @@ -382,7 +386,17 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { return } - topicChannel := make(chan string) + getTopicPartitions := func(topic string) { + defer wg.Done() + partitions, err := e.client.Partitions(topic) + if err != nil { + glog.Errorf("Cannot get partitions of topic %s: %v", topic, err) + } else { + e.mu.Lock() + topicPartitionsMap[topic] = partitions + e.mu.Unlock() + } + } getTopicMetrics := func(topic string) { defer wg.Done() @@ -478,6 +492,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { glog.Errorf("Cannot get consumer group %v", err) } + for _, group := range ConsumerGroups { offset, _ := group.FetchOffset(topic, partition) if offset > 0 { @@ -492,52 +507,21 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } } - loopTopics := func(id int) { - ok := true - for ok { - topic, open := <-topicChannel - ok = open - if open { - getTopicMetrics(topic) - } - } - } - - minx := func(x int, y int) int { - if x < y { - return x - } else { - return y - } - } - - N := len(topics) - if N > 1 { - N = minx(N/2, e.topicWorkers) - } - - for w := 1; w <= N; w++ { - go loopTopics(w) - } - - for _, topic := range topics { - if e.topicFilter.MatchString(topic) { - wg.Add(1) - topicChannel <- topic - } - } - close(topicChannel) - - wg.Wait() - getConsumerGroupMetrics := func(broker *sarama.Broker) { defer wg.Done() + glog.V(DEBUG).Infof("[%d] Fetching consumer group metrics", broker.ID()) + e.mu.Lock() + if _, ok := groupOffset[broker.ID()]; !ok { + groupOffset[broker.ID()] = make(map[string]map[string]map[int32]int64) + } + e.mu.Unlock() if err := broker.Open(e.client.Config()); err != nil && err != sarama.ErrAlreadyConnected { glog.Errorf("Cannot connect to broker %d: %v", broker.ID(), err) return } defer broker.Close() + glog.V(DEBUG).Infof("[%d]> listing groups", broker.ID()) groups, err := broker.ListGroups(&sarama.ListGroupsRequest{}) if err != nil { glog.Errorf("Cannot get consumer group: %v", err) @@ -550,12 +534,18 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } } + glog.V(DEBUG).Infof("[%d]> describing groups", broker.ID()) describeGroups, err := broker.DescribeGroups(&sarama.DescribeGroupsRequest{Groups: groupIds}) if err != nil { glog.Errorf("Cannot get describe groups: %v", err) return } for _, group := range describeGroups.Groups { + e.mu.Lock() + if _, ok := groupOffset[broker.ID()][group.GroupId]; !ok { + groupOffset[broker.ID()][group.GroupId] = make(map[string]map[int32]int64) + } + e.mu.Unlock() offsetFetchRequest := sarama.OffsetFetchRequest{ConsumerGroup: group.GroupId, Version: 1} if e.offsetShowAll { for topic, partitions := range offset { @@ -580,78 +570,149 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId, ) - offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest) - if err != nil { - glog.Errorf("Cannot get offset of group %s: %v", group.GroupId, err) - continue - } - for topic, partitions := range offsetFetchResponse.Blocks { - // If the topic is not consumed by that consumer group, skip it - topicConsumed := false - for _, offsetFetchResponseBlock := range partitions { - // Kafka will return -1 if there is no offset associated with a topic-partition under that consumer group - if offsetFetchResponseBlock.Offset != -1 { - topicConsumed = true - break + start := time.Now() + glog.V(DEBUG).Infof("[%d][%s]> fetching group offsets", broker.ID(), group.GroupId) + if offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest); err != nil { + glog.Errorf("Cannot get offset of group %s: %v", group.GroupId, err) + } else { + glog.V(DEBUG).Infof("[%d][%s] done fetching group offset in %s", broker.ID(), group.GroupId, time.Since(start).String()) + for topic, partitions := range offsetFetchResponse.Blocks { + // Topic filter + if !e.topicFilter.MatchString(topic) { + continue + } + // If the topic is not consumed by that consumer group, skip it + topicConsumed := false + for _, offsetFetchResponseBlock := range partitions { + // Kafka will return -1 if there is no offset associated with a topic-partition under that consumer group + if offsetFetchResponseBlock.Offset != -1 { + topicConsumed = true + break + } + } + if topicConsumed { + e.mu.Lock() + if _, ok := groupOffset[broker.ID()][group.GroupId][topic]; !ok { + groupOffset[broker.ID()][group.GroupId][topic] = make(map[int32]int64) + } + e.mu.Unlock() + for partition, offsetFetchResponseBlock := range partitions { + err := offsetFetchResponseBlock.Err + if err != sarama.ErrNoError { + glog.Errorf("Error for partition %d :%v\n", partition, err.Error()) + continue + } + e.mu.Lock() + groupOffset[broker.ID()][group.GroupId][topic][partition] = offsetFetchResponseBlock.Offset + e.mu.Unlock() + } } } - if !topicConsumed { - continue - } + } + } + } + + minx := func(x int, y int) int { + if x < y { + return x + } else { + return y + } + } + + // Firstly get topic-partitions information + glog.V(DEBUG).Infof("Fetching topic-partitions information") + for _, topic := range topics { + wg.Add(1) + go getTopicPartitions(topic) + } + wg.Wait() + + // Secondly getConsumerGroupMetrics + glog.V(DEBUG).Info("Fetching consumer group metrics") + if len(e.client.Brokers()) > 0 { + for _, broker := range e.client.Brokers() { + wg.Add(1) + go getConsumerGroupMetrics(broker) + } + wg.Wait() + } else { + glog.Errorln("No valid broker, cannot get consumer group metrics") + } + + // And then getTopicMetrics + topicChannel := make(chan string) + + loopTopics := func(id int) { + ok := true + for ok { + topic, open := <-topicChannel + ok = open + if open { + glog.V(DEBUG).Infof("Collecting metrics [%d] for topic %s", id, topic) + getTopicMetrics(topic) + } + } + } + + // concurrency control + N := minx(len(topics)/2, e.topicWorkers) + for w := 1; w <= N; w++ { + go loopTopics(w) + } + + glog.V(DEBUG).Infoln("Fetching topic metrics") + for _, topic := range topics { + if e.topicFilter.MatchString(topic) { + wg.Add(1) + topicChannel <- topic + } + } + close(topicChannel) + wg.Wait() - var currentOffsetSum int64 + // calculating consume group lag + calculateConsumeGroupMetrics := func(groupOffsetMap map[string]map[string]map[int32]int64) { + defer wg.Done() + for group, topicPartitionOffset := range groupOffsetMap { + for topic, partitionOffsetMap := range topicPartitionOffset { + var groupCurrentOffsetSum int64 var lagSum int64 - for partition, offsetFetchResponseBlock := range partitions { - err := offsetFetchResponseBlock.Err - if err != sarama.ErrNoError { - glog.Errorf("Error for partition %d :%v", partition, err.Error()) - continue - } - currentOffset := offsetFetchResponseBlock.Offset - currentOffsetSum += currentOffset - ch <- prometheus.MustNewConstMetric( - consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), - ) - e.mu.Lock() - if offset, ok := offset[topic][partition]; ok { - // If the topic is consumed by that consumer group, but no offset associated with the partition - // forcing lag to -1 to be able to alert on that - var lag int64 - if offsetFetchResponseBlock.Offset == -1 { - lag = -1 - } else { - lag = offset - offsetFetchResponseBlock.Offset - lagSum += lag - } + for partition, gOffset := range partitionOffsetMap { + cOffset, ok := offset[topic][partition] + if ok { + groupCurrentOffsetSum += gOffset + lag := cOffset - gOffset + lagSum += lag + ch <- prometheus.MustNewConstMetric( + consumergroupCurrentOffset, prometheus.GaugeValue, float64(gOffset), group, topic, strconv.FormatInt(int64(partition), 10), + ) ch <- prometheus.MustNewConstMetric( - consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), + consumergroupLag, prometheus.GaugeValue, float64(lag), group, topic, strconv.FormatInt(int64(partition), 10), ) - } else { - glog.Errorf("No offset of topic %s partition %d, cannot get consumer group lag", topic, partition) } - e.mu.Unlock() } + ch <- prometheus.MustNewConstMetric( - consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic, + consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(groupCurrentOffsetSum), group, topic, ) ch <- prometheus.MustNewConstMetric( - consumergroupLagSum, prometheus.GaugeValue, float64(lagSum), group.GroupId, topic, + consumergroupLagSum, prometheus.GaugeValue, float64(lagSum), group, topic, ) } } } - glog.V(DEBUG).Info("Fetching consumer group metrics") - if len(e.client.Brokers()) > 0 { - for _, broker := range e.client.Brokers() { + if len(groupOffset) > 0 { + glog.V(DEBUG).Infoln("Calculating consume group lag") + for _, v := range groupOffset { wg.Add(1) - go getConsumerGroupMetrics(broker) + go calculateConsumeGroupMetrics(v) } wg.Wait() - } else { - glog.Errorln("No valid broker, cannot get consumer group metrics") } + } func init() {