Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update metric exporter to report lag as unit of time #2

Merged
merged 3 commits into from
Dec 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .DS_Store
Binary file not shown.
43 changes: 40 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ This image is configurable using different flags
| web.telemetry-path | /metrics | Path under which to expose metrics |
| log.level | info | Only log messages with the given severity or above. Valid levels: [debug, info, warn, error, fatal] |
| log.enable-sarama | false | Turn on Sarama logging |
| max.offsets | 1000 | Maximum number of offsets to store in the interpolation table for a partition |
| prune.interval | 30 | How frequently should the interpolation table be pruned, in seconds |

### Notes

Expand All @@ -131,7 +133,7 @@ For details on the underlying metrics please see [Apache Kafka](https://kafka.ap

**Metrics details**

| Name | Exposed informations |
| Name | Exposed information |
| --------------- | -------------------------------------- |
| `kafka_brokers` | Number of Brokers in the Kafka Cluster |

Expand All @@ -147,7 +149,7 @@ kafka_brokers 3

**Metrics details**

| Name | Exposed informations |
| Name | Exposed information |
| -------------------------------------------------- | --------------------------------------------------- |
| `kafka_topic_partitions` | Number of partitions for this Topic |
| `kafka_topic_partition_current_offset` | Current Offset of a Broker at Topic/Partition |
Expand Down Expand Up @@ -198,7 +200,7 @@ kafka_topic_partition_under_replicated_partition{partition="0",topic="__consumer

**Metrics details**

| Name | Exposed informations |
| Name | Exposed information |
| ------------------------------------ | ------------------------------------------------------------- |
| `kafka_consumergroup_current_offset` | Current Offset of a ConsumerGroup at Topic/Partition |
| `kafka_consumergroup_lag` | Current Approximate Lag of a ConsumerGroup at Topic/Partition |
Expand All @@ -215,13 +217,48 @@ kafka_consumergroup_current_offset{consumergroup="KMOffsetCache-kafka-manager-38
kafka_consumergroup_lag{consumergroup="KMOffsetCache-kafka-manager-3806276532-ml44w",partition="0",topic="__consumer_offsets"} 1
```

### Consumer Lag

**Metric Details**

| Name | Exposed information |
| ------------------------------------ | ------------------------------------------------------------- |
| `kafka_consumer_lag_millis` | Current approximation of consumer lag for a ConsumerGroup at Topic/Partition |
| `kafka_consumer_lag_extrapolation` | Indicates that a consumer group lag estimation used extrapolation |
| `kafka_consumer_lag_interpolation` | Indicates that a consumer group lag estimation used interpolation |

**Metrics output example**
```
# HELP kafka_consumer_lag_extrapolation Indicates that a consumer group lag estimation used extrapolation
# TYPE kafka_consumer_lag_extrapolation counter
kafka_consumer_lag_extrapolation{consumergroup="perf-consumer-74084",partition="0",topic="test"} 1

# HELP kafka_consumer_lag_interpolation Indicates that a consumer group lag estimation used interpolation
# TYPE kafka_consumer_lag_interpolation counter
kafka_consumer_lag_interpolation{consumergroup="perf-consumer-74084",partition="0",topic="test"} 1

# HELP kafka_consumer_lag_millis Current approximation of consumer lag for a ConsumerGroup at Topic/Partition
# TYPE kafka_consumer_lag_millis gauge
kafka_consumer_lag_millis{consumergroup="perf-consumer-74084",partition="0",topic="test"} 3.4457231197552e+10
```

Grafana Dashboard
-------

Grafana Dashboard ID: 7589, name: Kafka Exporter Overview.

For details of the dashboard please see [Kafka Exporter Overview](https://grafana.com/dashboards/7589).

Lag Estimation
-
The technique to estimate lag for a consumer group, topic, and partition is taken from the [Lightbend Kafka Lag Exporter](https://github.com/lightbend/kafka-lag-exporter).

Once the exporter starts up, sampling of the next offset to be produced begins. The interpolation table is built from these samples, and the current offset for each monitored consumer group are compared against values in the table. If an upper and lower bound for the current offset of a consumer group are in the table, the interpolation technique is used. If only an upper bound is container within the table, extrapolation is used.

At a configurable interval `prune.interval` (default is 30 seconds) an operation to prune the interpolation table is performed. Any consumer group or topic that are no longer listed by the broker is removed. The number of offsets for each partition is trimmed down to `max.offsets` (default 1000), with the oldest offsets removed first.

Pruning of the interpolation table happens on a separate thread and thread safety is ensured by a lock around the interpolation table.

Contribute
----------

Expand Down
10 changes: 10 additions & 0 deletions examples/lag_demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Slow Producer and Consumer

The `lag_demo` package creates a [Kafka](https://kafka.apache.org/) producer and consumer for the purposes of demonstrating lag in a consumer group.


## Contributing
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.

## License
[MIT](https://choosealicense.com/licenses/mit/)
12 changes: 12 additions & 0 deletions examples/lag_demo/lag_demo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package main

import "sync"

func main() {
wg := sync.WaitGroup{}
wg.Add(1)
go slowConsumer(wg)
wg.Add(1)
go slowProducer(wg)
wg.Wait()
}
95 changes: 95 additions & 0 deletions examples/lag_demo/slow_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package main

import (
"context"
"github.com/Shopify/sarama"
plog "github.com/prometheus/common/log"
"os"
"os/signal"
"sync"
"syscall"
"time"
)

type Consumer struct {
ready chan bool
}

func slowConsumer(wg sync.WaitGroup) {
defer wg.Done()
consumer := Consumer{ready: make(chan bool)}
ctx := context.Background()

client, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "groupie-group", nil)
if err != nil {
plog.Fatalf("Error creating consumer group client: %v", err)
}

topics := []string{"test"}

wg.Add(1)
go func() {
defer wg.Done()
for {

if err := client.Consume(ctx, topics, &consumer); err != nil {
plog.Fatalf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
consumer.ready = make(chan bool)
}
}()

<-consumer.ready // Await till the consumer has been set up
plog.Infof("Sarama consumer up and running!...")

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
select {
case <-ctx.Done():
plog.Infof("terminating: context cancelled")
case <-sigterm:
plog.Infof("terminating: via signal")
}
wg.Wait()
if err = client.Close(); err != nil {
plog.Fatalf("Error closing client: %v", err)
}
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(consumer.ready)
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
plog.Infof("Consuming from test topic")
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
counter := 0
for message := range claim.Messages() {
counter++
if counter >= 5000 {
plog.Infof("Pausing consumer for 50 seconds")
time.Sleep(50 * time.Second)
counter = 0
}
plog.Debugf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}

return nil
}
51 changes: 51 additions & 0 deletions examples/lag_demo/slow_producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"github.com/Shopify/sarama"
plog "github.com/prometheus/common/log"
"os"
"os/signal"
"sync"
"time"
)

func slowProducer(wg sync.WaitGroup) {
defer wg.Done()
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}

defer func() {
if err := producer.Close(); err != nil {
plog.Fatalf("Error closing producer: %s", err.Error())
}
}()

// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

counter := 0
var enqueued, producerErrors int
ProducerLoop:
for {
select {
case producer.Input() <- &sarama.ProducerMessage{Topic: "test", Key: nil, Value: sarama.StringEncoder("testing 123")}:
enqueued++
counter++
if counter >= 50 {
counter = 0
time.Sleep(1 * time.Second)
plog.Debugf("Pausing producer for one second to throttle message production")
}
case err := <-producer.Errors():
plog.Infof("Failed to produce message", err)
producerErrors++
case <-signals:
break ProducerLoop
}
}

plog.Infof("Enqueued: %d; errors: %d\n", enqueued, producerErrors)
}
6 changes: 1 addition & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ module github.com/danielqsj/kafka_exporter
go 1.14

require (
github.com/Shopify/sarama v1.26.1
github.com/Shopify/sarama v1.27.2
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/frankban/quicktest v1.9.0 // indirect
github.com/golang/protobuf v1.3.5 // indirect
github.com/klauspost/compress v1.10.3 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/krallistic/kazoo-go v0.0.0-20170526135507-a15279744f4e
github.com/prometheus/client_golang v1.5.1
Expand All @@ -17,8 +15,6 @@ require (
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da // indirect
github.com/sirupsen/logrus v1.5.0 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 // indirect
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6
)
Loading