Skip to content

Commit

Permalink
Add Kafka Consumer Plugin
Browse files Browse the repository at this point in the history
The Kafka consumer plugin polls a specified Kafka topic and adds messages to
InfluxDB. The plugin assumes messages follow the line protocol. Consumer Group
is used to talk to the Kafka cluster so multiple instances of telegraf can read
from the same topic in parallel.
  • Loading branch information
es committed Jul 2, 2015
1 parent 86a6f33 commit 0692b4b
Show file tree
Hide file tree
Showing 4 changed files with 311 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package all

import (
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/memcached"
_ "github.com/influxdb/telegraf/plugins/mysql"
_ "github.com/influxdb/telegraf/plugins/postgresql"
Expand Down
153 changes: 153 additions & 0 deletions plugins/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package kafka_consumer

import (
"os"
"os/signal"
"time"

"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/telegraf/plugins"
"github.com/wvanbergen/kafka/consumergroup"
"gopkg.in/Shopify/sarama.v1"
)

type Kafka struct {
ConsumerGroupName string
Topic string
ZookeeperPeers []string
Consumer *consumergroup.ConsumerGroup
BatchSize int
}

var sampleConfig = `
# topic to consume
topic = "topic_with_metrics"
# the name of the consumer group
consumerGroupName = "telegraf_metrics_consumers"
# an array of Zookeeper connection strings
zookeeperPeers = ["localhost:2181"]
# Batch size of points sent to InfluxDB
batchSize = 1000`

func (k *Kafka) SampleConfig() string {
return sampleConfig
}

func (k *Kafka) Description() string {
return "read metrics from a Kafka topic"
}

type Metric struct {
Measurement string `json:"measurement"`
Values map[string]interface{} `json:"values"`
Tags map[string]string `json:"tags"`
Time time.Time `json:"time"`
}

func (k *Kafka) Gather(acc plugins.Accumulator) error {
var consumerErr error
metricQueue := make(chan []byte, 200)

if k.Consumer == nil {
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup(
k.ConsumerGroupName,
[]string{k.Topic},
k.ZookeeperPeers,
nil,
)

if consumerErr != nil {
return consumerErr
}

c := make(chan os.Signal, 1)
halt := make(chan bool, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
halt <- true
emitMetrics(k, acc, metricQueue)
k.Consumer.Close()
}()

go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt)
}

return emitMetrics(k, acc, metricQueue)
}

func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error {
timeout := time.After(1 * time.Second)

for {
select {
case batch := <-metricConsumer:
var points []tsdb.Point
var err error
if points, err = tsdb.ParsePoints(batch); err != nil {
return err
}

for _, point := range points {
acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time())
}
case <-timeout:
return nil
}
}
}

const millisecond = 1000000 * time.Nanosecond

type ack func(*sarama.ConsumerMessage) error

func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) {
batch := make([]byte, 0)
currentBatchSize := 0
timeout := time.After(500 * millisecond)
var msg *sarama.ConsumerMessage

for {
select {
case msg = <-kafkaMsgs:
if currentBatchSize != 0 {
batch = append(batch, '\n')
}

batch = append(batch, msg.Value...)
currentBatchSize++

if currentBatchSize == maxBatchSize {
metricProducer <- batch
currentBatchSize = 0
batch = make([]byte, 0)
ackMsg(msg)
}
case <-timeout:
if currentBatchSize != 0 {
metricProducer <- batch
currentBatchSize = 0
batch = make([]byte, 0)
ackMsg(msg)
}

timeout = time.After(500 * millisecond)
case <-halt:
if currentBatchSize != 0 {
metricProducer <- batch
ackMsg(msg)
}

return
}
}
}

func init() {
plugins.Add("kafka", func() plugins.Plugin {
return &Kafka{}
})
}
62 changes: 62 additions & 0 deletions plugins/kafka_consumer/kafka_consumer_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package kafka_consumer

import (
"fmt"
"os"
"strings"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestReadsMetricsFromKafka(t *testing.T) {
var zkPeers, brokerPeers []string

if len(os.Getenv("ZOOKEEPER_PEERS")) == 0 {
zkPeers = []string{"localhost:2181"}
} else {
zkPeers = strings.Split(os.Getenv("ZOOKEEPER_PEERS"), ",")
}

if len(os.Getenv("KAFKA_PEERS")) == 0 {
brokerPeers = []string{"localhost:9092"}
} else {
brokerPeers = strings.Split(os.Getenv("KAFKA_PEERS"), ",")
}

k := &Kafka{
ConsumerGroupName: "telegraf_test_consumers",
Topic: fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()),
ZookeeperPeers: zkPeers,
}

msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"
producer, err := sarama.NewSyncProducer(brokerPeers, nil)
require.NoError(t, err)
_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: k.Topic, Value: sarama.StringEncoder(msg)})
producer.Close()

var acc testutil.Accumulator

// Sanity check
assert.Equal(t, 0, len(acc.Points), "there should not be any points")

err = k.Gather(&acc)
require.NoError(t, err)

assert.Equal(t, 1, len(acc.Points), "there should be a single point")

point := acc.Points[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)
assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time)
}
95 changes: 95 additions & 0 deletions plugins/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package kafka_consumer

import (
"strings"
"testing"
"time"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/Shopify/sarama.v1"
)

const testMsg = "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257"

func TestReadFromKafkaBatchesMsgsOnBatchSize(t *testing.T) {
halt := make(chan bool, 1)
metricChan := make(chan []byte, 1)
kafkaChan := make(chan *sarama.ConsumerMessage, 10)
for i := 0; i < 10; i++ {
kafkaChan <- saramaMsg(testMsg)
}

expectedBatch := strings.Repeat(testMsg+"\n", 9) + testMsg
readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
batch := <-metricChan
assert.Equal(t, expectedBatch, string(batch))

halt <- true

return nil
}, halt)
}

func TestReadFromKafkaBatchesMsgsOnTimeout(t *testing.T) {
halt := make(chan bool, 1)
metricChan := make(chan []byte, 1)
kafkaChan := make(chan *sarama.ConsumerMessage, 10)
for i := 0; i < 3; i++ {
kafkaChan <- saramaMsg(testMsg)
}

expectedBatch := strings.Repeat(testMsg+"\n", 2) + testMsg
readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error {
batch := <-metricChan
assert.Equal(t, expectedBatch, string(batch))

halt <- true

return nil
}, halt)
}

func TestEmitMetricsSendMetricsToAcc(t *testing.T) {
k := &Kafka{}
var acc testutil.Accumulator
testChan := make(chan []byte, 1)
testChan <- []byte(testMsg)

err := emitMetrics(k, &acc, testChan)
require.NoError(t, err)

assert.Equal(t, 1, len(acc.Points), "there should be a single point")

point := acc.Points[0]
assert.Equal(t, "cpu_load_short", point.Measurement)
assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values)
assert.Equal(t, map[string]string{
"host": "server01",
"direction": "in",
"region": "us-west",
}, point.Tags)

assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time)
}

func TestEmitMetricsTimesOut(t *testing.T) {
k := &Kafka{}
var acc testutil.Accumulator
testChan := make(chan []byte)

err := emitMetrics(k, &acc, testChan)
require.NoError(t, err)

assert.Equal(t, 0, len(acc.Points), "there should not be a any points")
}

func saramaMsg(val string) *sarama.ConsumerMessage {
return &sarama.ConsumerMessage{
Key: nil,
Value: []byte(val),
Offset: 0,
Partition: 0,
}
}

0 comments on commit 0692b4b

Please sign in to comment.