Skip to content

Commit

Permalink
Add Prometheus Metrics Exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
mwain committed Apr 28, 2020
1 parent d896a3f commit 0a6968d
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 0 deletions.
2 changes: 2 additions & 0 deletions core/internal/httpserver/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ func (hc *Coordinator) Configure() {
// This is a healthcheck URL. Please don't change it
hc.router.GET("/burrow/admin", hc.handleAdmin)

hc.router.Handler(http.MethodGet, "/metrics", hc.handlePrometheusMetrics())

// All valid paths go here
hc.router.GET("/v3/kafka", hc.handleClusterList)
hc.router.GET("/v3/kafka/:cluster", hc.handleClusterDetail)
Expand Down
182 changes: 182 additions & 0 deletions core/internal/httpserver/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package httpserver

import (
"net/http"
"strconv"

"github.com/prometheus/client_golang/prometheus"

"github.com/linkedin/Burrow/core/protocol"

"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

var (
consumerTotalLagGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_lag_total",
Help: "The sum of all partition current lag values for the group",
},
[]string{"cluster", "consumer_group"},
)

consumerStatusGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_status",
Help: "The status of the consumer group. It is calculated from the highest status for the individual partitions. Statuses are an index list from NOTFOUND, OK, WARN, ERR, STOP, STALL, REWIND",
},
[]string{"cluster", "consumer_group"},
)

consumerPartitionCurrentOffset = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_current_offset",
Help: "Latest offset that Burrow is storing for this partition",
},
[]string{"cluster", "consumer_group", "topic", "partition"},
)

consumerPartitionLagGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_consumer_partition_lag",
Help: "Number of messages the consumer group is behind by for a partition as reported by Burrow",
},
[]string{"cluster", "consumer_group", "topic", "partition"},
)

topicPartitionOffsetGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "burrow_kafka_topic_partition_offset",
Help: "Latest offset the topic that Burrow is storing for this partition",
},
[]string{"cluster", "topic", "partition"},
)
)

func (hc *Coordinator) handlePrometheusMetrics() http.HandlerFunc {
promHandler := promhttp.Handler()

return http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
for _, cluster := range listClusters(hc.App) {
for _, consumer := range listConsumers(hc.App, cluster) {
consumerStatus := getFullConsumerStatus(hc.App, cluster, consumer)

if consumerStatus == nil ||
consumerStatus.Status == protocol.StatusNotFound ||
consumerStatus.Complete < 1.0 {
continue
}

labels := map[string]string{
"cluster": cluster,
"consumer_group": consumer,
}

consumerTotalLagGauge.With(labels).Set(float64(consumerStatus.TotalLag))
consumerStatusGauge.With(labels).Set(float64(consumerStatus.Status))

for _, partition := range consumerStatus.Partitions {
if partition.Complete < 1.0 {
continue
}

labels := map[string]string{
"cluster": cluster,
"consumer_group": consumer,
"topic": partition.Topic,
"partition": strconv.FormatInt(int64(partition.Partition), 10),
}

consumerPartitionCurrentOffset.With(labels).Set(float64(partition.End.Offset))
consumerPartitionLagGauge.With(labels).Set(float64(partition.CurrentLag))
}
}

// Topics
for _, topic := range listTopics(hc.App, cluster) {
for partitionNumber, offset := range getTopicDetail(hc.App, cluster, topic) {
topicPartitionOffsetGauge.With(map[string]string{
"cluster": cluster,
"topic": topic,
"partition": strconv.FormatInt(int64(partitionNumber), 10),
}).Set(float64(offset))
}
}
}

promHandler.ServeHTTP(resp, req)
})
}

func listClusters(app *protocol.ApplicationContext) []string {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchClusters,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []string{}
}

return response.([]string)
}

func listConsumers(app *protocol.ApplicationContext, cluster string) []string {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchConsumers,
Cluster: cluster,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []string{}
}

return response.([]string)
}

func getFullConsumerStatus(app *protocol.ApplicationContext, cluster, consumer string) *protocol.ConsumerGroupStatus {
request := &protocol.EvaluatorRequest{
Cluster: cluster,
Group: consumer,
ShowAll: true,
Reply: make(chan *protocol.ConsumerGroupStatus),
}
app.EvaluatorChannel <- request
response := <-request.Reply
return response
}

func listTopics(app *protocol.ApplicationContext, cluster string) []string {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchTopics,
Cluster: cluster,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []string{}
}

return response.([]string)
}

func getTopicDetail(app *protocol.ApplicationContext, cluster, topic string) []int64 {
request := &protocol.StorageRequest{
RequestType: protocol.StorageFetchTopic,
Cluster: cluster,
Topic: topic,
Reply: make(chan interface{}),
}
app.StorageChannel <- request
response := <-request.Reply
if response == nil {
return []int64{}
}

return response.([]int64)
}
156 changes: 156 additions & 0 deletions core/internal/httpserver/prometheus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package httpserver

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"

"github.com/linkedin/Burrow/core/protocol"
)

func TestHttpServer_handlePrometheusMetrics(t *testing.T) {
coordinator := fixtureConfiguredCoordinator()

// Respond to the expected storage requests
go func() {
request := <-coordinator.App.StorageChannel
assert.Equalf(t, protocol.StorageFetchClusters, request.RequestType, "Expected request of type StorageFetchClusters, not %v", request.RequestType)
request.Reply <- []string{"testcluster"}
close(request.Reply)

// List of consumers
request = <-coordinator.App.StorageChannel
assert.Equalf(t, protocol.StorageFetchConsumers, request.RequestType, "Expected request of type StorageFetchConsumers, not %v", request.RequestType)
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
request.Reply <- []string{"testgroup", "testgroup2"}
close(request.Reply)

// List of topics
request = <-coordinator.App.StorageChannel
assert.Equalf(t, protocol.StorageFetchTopics, request.RequestType, "Expected request of type StorageFetchTopics, not %v", request.RequestType)
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
request.Reply <- []string{"testtopic", "testtopic1"}
close(request.Reply)

// Topic details
request = <-coordinator.App.StorageChannel
assert.Equalf(t, protocol.StorageFetchTopic, request.RequestType, "Expected request of type StorageFetchTopic, not %v", request.RequestType)
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
assert.Equalf(t, "testtopic", request.Topic, "Expected request Topic to be testtopic, not %v", request.Topic)
request.Reply <- []int64{6556, 5566}
close(request.Reply)

request = <-coordinator.App.StorageChannel
assert.Equalf(t, protocol.StorageFetchTopic, request.RequestType, "Expected request of type StorageFetchTopic, not %v", request.RequestType)
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
assert.Equalf(t, "testtopic1", request.Topic, "Expected request Topic to be testtopic, not %v", request.Topic)
request.Reply <- []int64{54}
close(request.Reply)
}()

// Respond to the expected evaluator requests
go func() {
// testgroup happy paths
request := <-coordinator.App.EvaluatorChannel
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
assert.Equalf(t, "testgroup", request.Group, "Expected request Group to be testgroup, not %v", request.Group)
assert.True(t, request.ShowAll, "Expected request ShowAll to be True")
response := &protocol.ConsumerGroupStatus{
Cluster: request.Cluster,
Group: request.Group,
Status: protocol.StatusOK,
Complete: 1.0,
Partitions: []*protocol.PartitionStatus{
{
Topic: "testtopic",
Partition: 0,
Status: protocol.StatusOK,
CurrentLag: 100,
Complete: 1.0,
End: &protocol.ConsumerOffset{
Offset: 22663,
},
},
{
Topic: "testtopic",
Partition: 1,
Status: protocol.StatusOK,
CurrentLag: 10,
Complete: 1.0,
End: &protocol.ConsumerOffset{
Offset: 2488,
},
},
{
Topic: "testtopic1",
Partition: 0,
Status: protocol.StatusOK,
CurrentLag: 50,
Complete: 1.0,
End: &protocol.ConsumerOffset{
Offset: 99888,
},
},
{
Topic: "incomplete",
Partition: 0,
Status: protocol.StatusOK,
CurrentLag: 0,
Complete: 0.2,
End: &protocol.ConsumerOffset{
Offset: 5335,
},
},
},
TotalPartitions: 2134,
Maxlag: &protocol.PartitionStatus{},
TotalLag: 2345,
}
request.Reply <- response
close(request.Reply)

// testgroup2 not found
request = <-coordinator.App.EvaluatorChannel
assert.Equalf(t, "testcluster", request.Cluster, "Expected request Cluster to be testcluster, not %v", request.Cluster)
assert.Equalf(t, "testgroup2", request.Group, "Expected request Group to be testgroup, not %v", request.Group)
assert.True(t, request.ShowAll, "Expected request ShowAll to be True")
response = &protocol.ConsumerGroupStatus{
Cluster: request.Cluster,
Group: request.Group,
Status: protocol.StatusNotFound,
}
request.Reply <- response
close(request.Reply)
}()

// Set up a request
req, err := http.NewRequest("GET", "/metrics", nil)
assert.NoError(t, err, "Expected request setup to return no error")

// Call the handler via httprouter
rr := httptest.NewRecorder()
coordinator.router.ServeHTTP(rr, req)

assert.Equalf(t, http.StatusOK, rr.Code, "Expected response code to be 200, not %v", rr.Code)

promExp := rr.Body.String()
assert.Contains(t, promExp, `burrow_kafka_consumer_status{cluster="testcluster",consumer_group="testgroup"} 1`)
assert.Contains(t, promExp, `burrow_kafka_consumer_lag_total{cluster="testcluster",consumer_group="testgroup"} 2345`)

assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic"} 100`)
assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="1",topic="testtopic"} 10`)
assert.Contains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic1"} 50`)

assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic"} 22663`)
assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="1",topic="testtopic"} 2488`)
assert.Contains(t, promExp, `burrow_kafka_consumer_current_offset{cluster="testcluster",consumer_group="testgroup",partition="0",topic="testtopic1"} 99888`)

assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="0",topic="testtopic"} 6556`)
assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="1",topic="testtopic"} 5566`)
assert.Contains(t, promExp, `burrow_kafka_topic_partition_offset{cluster="testcluster",partition="0",topic="testtopic1"} 54`)

assert.NotContains(t, promExp, `burrow_kafka_consumer_partition_lag{cluster="testcluster",consumer_group="testgroup",partition="0",topic="incomplete"} 0`)
assert.NotContains(t, promExp, "testgroup2")
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/pelletier/go-toml v1.6.0 // indirect
github.com/pierrec/lz4 v2.4.1+incompatible // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v0.9.3
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 // indirect
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
github.com/smartystreets/assertions v1.0.1 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
Expand Down Expand Up @@ -54,6 +55,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
Expand Down Expand Up @@ -111,6 +113,7 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand All @@ -133,12 +136,16 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3 h1:9iH4JKXLzFbOAdtqv/a+j8aewx2Y8lAjAydhbaScPF8=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.4.0 h1:7etb9YClo3a6HjLzfl6rIQaU+FDfi0VSX39io3aQ+DM=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084 h1:sofwID9zm4tzrgykg80hfFph1mryUeLRsUfoocVVmRY=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
Expand Down

0 comments on commit 0a6968d

Please sign in to comment.