From 287b937d18226222ae0f233246a97ef6d6b37285 Mon Sep 17 00:00:00 2001 From: Barry Dobson Date: Thu, 4 Apr 2024 15:29:01 +0100 Subject: [PATCH 1/3] feat: add new metric kafka_consumergroup_topicmembers --- kafka_exporter.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/kafka_exporter.go b/kafka_exporter.go index 36021149..a16b366a 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -56,6 +56,7 @@ var ( consumergroupLagSum *prometheus.Desc consumergroupLagZookeeper *prometheus.Desc consumergroupMembers *prometheus.Desc + consumergroupTopicMembers *prometheus.Desc ) // Exporter collects Kafka stats from the given server and exports them using @@ -590,9 +591,25 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { } } } + ch <- prometheus.MustNewConstMetric( consumergroupMembers, prometheus.GaugeValue, float64(len(group.Members)), group.GroupId, ) + + uniqueTopics := make(map[string]int) + for _, member := range group.Members { + meta, _ := member.GetMemberMetadata() + for _, topic := range meta.Topics { + uniqueTopics[topic]++ + } + } + + for topic, count := range uniqueTopics { + ch <- prometheus.MustNewConstMetric( + consumergroupTopicMembers, prometheus.GaugeValue, float64(count), group.GroupId, topic, + ) + } + offsetFetchResponse, err := broker.FetchOffset(&offsetFetchRequest) if err != nil { klog.Errorf("Cannot get offset of group %s: %v", group.GroupId, err) @@ -892,6 +909,12 @@ func setup( []string{"consumergroup"}, labels, ) + consumergroupTopicMembers = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "consumergroup", "topicmembers"), + "Amount of members in a consumer group against a topic", + []string{"consumergroup", "topic"}, labels, + ) + if logSarama { sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } From 053f4e51dc00c861dc7e68892ac3697cb3d7ce1f Mon Sep 17 00:00:00 2001 From: Barry Dobson Date: Fri, 7 Jun 2024 11:33:02 +0100 Subject: [PATCH 2/3] fix: error handling when no metadata --- kafka_exporter.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/kafka_exporter.go b/kafka_exporter.go index eb8a40b0..ef5fe80b 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -602,7 +602,15 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { uniqueTopics := make(map[string]int) for _, member := range group.Members { - meta, _ := member.GetMemberMetadata() + meta, err := member.GetMemberMetadata() + if err != nil { + klog.Errorf("Cannot get metadata for member %s: %v", member.MemberId, err) + continue + } + if meta == nil { + klog.Warningf("No metadata for member %s", member.MemberId) + continue + } for _, topic := range meta.Topics { uniqueTopics[topic]++ } From ba91c504442a39db7aa3274498e4fd696c898265 Mon Sep 17 00:00:00 2001 From: Barry Dobson Date: Fri, 7 Jun 2024 12:47:10 +0100 Subject: [PATCH 3/3] fix: better error handling --- .gitignore | 2 ++ .promu.yml | 6 ++++++ Makefile | 10 ++++++---- kafka_exporter.go | 12 ++++-------- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 58622166..f7ddd709 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,5 @@ kafka_exporter # Test configuration test/ .DS_Store + +*.strimzi \ No newline at end of file diff --git a/.promu.yml b/.promu.yml index 54bc02cd..6ba6dce3 100644 --- a/.promu.yml +++ b/.promu.yml @@ -11,3 +11,9 @@ build: tarball: files: - LICENSE +crossbuild: + platforms: + - linux/amd64 + - linux/s390x + - linux/arm64 + - linux/ppc64le \ No newline at end of file diff --git a/Makefile b/Makefile index a99dc580..a00d75cd 100644 --- a/Makefile +++ b/Makefile @@ -48,10 +48,12 @@ docker: build @docker build -t "$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)" --build-arg BIN_DIR=. . push: crossbuild - @echo ">> building and pushing multi-arch docker images, $(DOCKER_USERNAME),$(DOCKER_IMAGE_NAME),$(GIT_TAG_NAME)" +ifneq (, $(DOCKER_PASSWORD)) @docker login -u $(DOCKER_USERNAME) -p $(DOCKER_PASSWORD) +endif + @echo ">> building and pushing multi-arch docker images, $(DOCKER_USERNAME),$(DOCKER_IMAGE_NAME),$(GIT_TAG_NAME)" @docker buildx create --use - @docker buildx build -t "$(DOCKER_USERNAME)/$(DOCKER_IMAGE_NAME):$(GIT_TAG_NAME)" \ + @docker buildx build -t "$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_TAG)" \ --output "$(PUSHTAG)" \ --platform "$(DOCKER_PLATFORMS)" \ . @@ -106,8 +108,8 @@ lint: golangci-lint golangci-lint: ifeq (, $(shell which golangci-lint)) @GOOS=$(shell uname -s | tr A-Z a-z) \ - GOARCH=$(subst x86_64,amd64,$(patsubst i%86,386,$(shell uname -m))) \ - $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.52.2 + GOARCH=$(subst x86_64,amd64,$(patsubst i%86,386,$(shell uname -m))) \ + $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.52.2 GOLANG_LINT=$(shell go env GOPATH)/bin/golangci-lint else GOLANG_LINT=$(shell which golangci-lint) diff --git a/kafka_exporter.go b/kafka_exporter.go index ef5fe80b..afdcb9b5 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -602,16 +602,12 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { uniqueTopics := make(map[string]int) for _, member := range group.Members { - meta, err := member.GetMemberMetadata() - if err != nil { - klog.Errorf("Cannot get metadata for member %s: %v", member.MemberId, err) - continue - } - if meta == nil { - klog.Warningf("No metadata for member %s", member.MemberId) + assignment, err := member.GetMemberAssignment() + if err != nil || assignment == nil { + klog.Errorf("Cannot get GetMemberAssignment of group member %v : %v", member, err) continue } - for _, topic := range meta.Topics { + for topic := range assignment.Topics { uniqueTopics[topic]++ } }