Skip to content

Commit

Permalink
Get Partition Sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
atrbgithub authored May 13, 2022
1 parent 83de727 commit 5923381
Showing 1 changed file with 64 additions and 2 deletions.
66 changes: 64 additions & 2 deletions kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var (
topicOldestOffset *prometheus.Desc
topicPartitionLeader *prometheus.Desc
topicPartitionReplicas *prometheus.Desc
topicPartitionSize *prometheus.Desc
topicPartitionInSyncReplicas *prometheus.Desc
topicPartitionUsesPreferredReplica *prometheus.Desc
topicUnderReplicatedPartition *prometheus.Desc
Expand All @@ -61,6 +62,8 @@ type Exporter struct {
topicWorkers int
allowConcurrent bool
sgMutex sync.Mutex
sgPartitionsMutex sync.Mutex
sgPartitionLeadersMutex sync.Mutex
sgWaitCh chan struct{}
sgChans []chan<- prometheus.Metric
consumerGroupFetchAll bool
Expand Down Expand Up @@ -249,6 +252,7 @@ func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {
ch <- topicPartitions
ch <- topicPartitionLeader
ch <- topicPartitionReplicas
ch <- topicPartitionSize
ch <- topicPartitionInSyncReplicas
ch <- topicPartitionUsesPreferredReplica
ch <- topicUnderReplicatedPartition
Expand Down Expand Up @@ -316,6 +320,8 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
)

offset := make(map[string]map[int32]int64)
partitionLeaders := make(map[int32]map[string][]int32, len(e.client.Brokers()))
partitionSizes := make(map[string]map[int32]int64)

now := time.Now()

Expand All @@ -334,9 +340,11 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
plog.Errorf("Cannot get topics: %v", err)
return
}
for _, broker := range e.client.Brokers() {
partitionLeaders[broker.ID()] = make(map[string][]int32, len(topics))
}

topicChannel := make(chan string)

getTopicMetrics := func(topic string) {
defer wg.Done()
plog.Debugf("Fetching metrics for \"%s\"", topic)
Expand All @@ -350,12 +358,16 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
)
e.mu.Lock()
offset[topic] = make(map[int32]int64, len(partitions))
partitionSizes[topic] = make(map[int32]int64, len(partitions))
e.mu.Unlock()
for _, partition := range partitions {
broker, err := e.client.Leader(topic, partition)
if err != nil {
plog.Errorf("Cannot get leader of topic %s partition %d: %v", topic, partition, err)
} else {
e.sgPartitionLeadersMutex.Lock()
partitionLeaders[broker.ID()][topic] = append(partitionLeaders[broker.ID()][topic], partition)
e.sgPartitionLeadersMutex.Unlock()
ch <- prometheus.MustNewConstMetric(
topicPartitionLeader, prometheus.GaugeValue, float64(broker.ID()), topic, strconv.FormatInt(int64(partition), 10),
)
Expand Down Expand Up @@ -484,7 +496,6 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
plog.Errorf("Cannot connect to broker %d: %v", broker.ID(), err)
return
}
defer broker.Close()

plog.Debugf("[%d]> listing groups", broker.ID())
groups, err := broker.ListGroups(&sarama.ListGroupsRequest{})
Expand Down Expand Up @@ -581,6 +592,33 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
}
}

getPartitionSizeMetrics := func(broker *sarama.Broker, brokerPartitions map[string][]int32, locPartitionSizes map[string]map[int32]int64) {
defer wg.Done()
if err := broker.Open(e.client.Config()); err != nil && err != sarama.ErrAlreadyConnected {
plog.Errorf("Cannot connect to broker %d: %v", broker.ID(), err)
return
}
defer broker.Close()

partitionArr := []sarama.DescribeLogDirsRequestTopic{}
for topic, partitions := range brokerPartitions {
partitionArr = append(partitionArr, sarama.DescribeLogDirsRequestTopic{Topic: topic, PartitionIDs: partitions})
}
describeLogDirs, err := broker.DescribeLogDirs(&sarama.DescribeLogDirsRequest{Version: 1, DescribeTopics: partitionArr})
if err != nil {
plog.Errorf("Error describe log dirs: %v", err)
}
e.sgPartitionsMutex.Lock()
for _, logDir := range describeLogDirs.LogDirs {
for _, topic4size := range logDir.Topics {
for _, partition4size := range topic4size.Partitions {
locPartitionSizes[topic4size.Topic][partition4size.PartitionID] = partition4size.Size
}
}
}
e.sgPartitionsMutex.Unlock()
}

plog.Info("Fetching consumer group metrics")
if len(e.client.Brokers()) > 0 {
for _, broker := range e.client.Brokers() {
Expand All @@ -591,6 +629,24 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) {
} else {
plog.Errorln("No valid broker, cannot get consumer group metrics")
}

plog.Info("Fetching partition size metrics")
if len(e.client.Brokers()) > 0 {
for _, broker := range e.client.Brokers() {
wg.Add(1)
go getPartitionSizeMetrics(broker, partitionLeaders[broker.ID()], partitionSizes)
}
wg.Wait()
for topic, partitions := range partitionSizes {
for partition, size := range partitions {
ch <- prometheus.MustNewConstMetric(
topicPartitionSize, prometheus.GaugeValue, float64(size), topic, strconv.FormatInt(int64(partition), 10),
)
}
}
} else {
plog.Errorln("No valid broker, cannot get partition size metrics")
}
}

func init() {
Expand Down Expand Up @@ -680,6 +736,12 @@ func main() {
[]string{"topic", "partition"}, labels,
)

topicPartitionSize = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_size"),
"Size for this Topic Partition",
[]string{"topic", "partition"}, labels,
)

topicPartitionInSyncReplicas = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "topic", "partition_in_sync_replica"),
"Number of In-Sync Replicas for this Topic/Partition",
Expand Down

0 comments on commit 5923381

Please sign in to comment.