diff --git a/plugins/all/all.go b/plugins/all/all.go index 71381aa8706ad..466a7166d0869 100644 --- a/plugins/all/all.go +++ b/plugins/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdb/telegraf/plugins/kafka_consumer" _ "github.com/influxdb/telegraf/plugins/memcached" _ "github.com/influxdb/telegraf/plugins/mysql" _ "github.com/influxdb/telegraf/plugins/postgresql" diff --git a/plugins/kafka_consumer/kafka_consumer.go b/plugins/kafka_consumer/kafka_consumer.go new file mode 100644 index 0000000000000..8353ea5fc136b --- /dev/null +++ b/plugins/kafka_consumer/kafka_consumer.go @@ -0,0 +1,153 @@ +package kafka_consumer + +import ( + "os" + "os/signal" + "time" + + "github.com/influxdb/influxdb/tsdb" + "github.com/influxdb/telegraf/plugins" + "github.com/wvanbergen/kafka/consumergroup" + "gopkg.in/Shopify/sarama.v1" +) + +type Kafka struct { + ConsumerGroupName string + Topic string + ZookeeperPeers []string + Consumer *consumergroup.ConsumerGroup + BatchSize int +} + +var sampleConfig = ` +# topic to consume +topic = "topic_with_metrics" + +# the name of the consumer group +consumerGroupName = "telegraf_metrics_consumers" + +# an array of Zookeeper connection strings +zookeeperPeers = ["localhost:2181"] + +# Batch size of points sent to InfluxDB +batchSize = 10` + +func (k *Kafka) SampleConfig() string { + return sampleConfig +} + +func (k *Kafka) Description() string { + return "read metrics from a Kafka topic" +} + +type Metric struct { + Measurement string `json:"measurement"` + Values map[string]interface{} `json:"values"` + Tags map[string]string `json:"tags"` + Time time.Time `json:"time"` +} + +func (k *Kafka) Gather(acc plugins.Accumulator) error { + var consumerErr error + metricQueue := make(chan []byte, 200) + + if k.Consumer == nil { + k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( + k.ConsumerGroupName, + []string{k.Topic}, + k.ZookeeperPeers, + nil, + ) + + if consumerErr != nil { + return consumerErr + } + + c := make(chan os.Signal, 1) + halt := make(chan bool, 1) + signal.Notify(c, os.Interrupt) + go func() { + <-c + halt <- true + emitMetrics(k, acc, metricQueue) + k.Consumer.Close() + }() + + go readFromKafka(k.Consumer.Messages(), metricQueue, k.BatchSize, k.Consumer.CommitUpto, halt) + } + + return emitMetrics(k, acc, metricQueue) +} + +func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error { + timeout := time.After(1 * time.Second) + + for { + select { + case batch := <-metricConsumer: + var points []tsdb.Point + var err error + if points, err = tsdb.ParsePoints(batch); err != nil { + return err + } + + for _, point := range points { + acc.AddValuesWithTime(point.Name(), point.Fields(), point.Tags(), point.Time()) + } + case <-timeout: + return nil + } + } +} + +const millisecond = 1000000 * time.Nanosecond + +type ack func(*sarama.ConsumerMessage) error + +func readFromKafka(kafkaMsgs <-chan *sarama.ConsumerMessage, metricProducer chan<- []byte, maxBatchSize int, ackMsg ack, halt <-chan bool) { + batch := make([]byte, 0) + currentBatchSize := 0 + timeout := time.After(500 * millisecond) + var msg *sarama.ConsumerMessage + + for { + select { + case msg = <-kafkaMsgs: + if currentBatchSize != 0 { + batch = append(batch, '\n') + } + + batch = append(batch, msg.Value...) + currentBatchSize++ + + if currentBatchSize == maxBatchSize { + metricProducer <- batch + currentBatchSize = 0 + batch = make([]byte, 0) + ackMsg(msg) + } + case <-timeout: + if currentBatchSize != 0 { + metricProducer <- batch + currentBatchSize = 0 + batch = make([]byte, 0) + ackMsg(msg) + } + + timeout = time.After(500 * millisecond) + case <-halt: + if currentBatchSize != 0 { + metricProducer <- batch + ackMsg(msg) + } + + return + } + } +} + +func init() { + plugins.Add("kafka", func() plugins.Plugin { + return &Kafka{} + }) +} diff --git a/plugins/kafka_consumer/kafka_consumer_integration_test.go b/plugins/kafka_consumer/kafka_consumer_integration_test.go new file mode 100644 index 0000000000000..1541cb1275018 --- /dev/null +++ b/plugins/kafka_consumer/kafka_consumer_integration_test.go @@ -0,0 +1,62 @@ +package kafka_consumer + +import ( + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/Shopify/sarama" + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestReadsMetricsFromKafka(t *testing.T) { + var zkPeers, brokerPeers []string + + if len(os.Getenv("ZOOKEEPER_PEERS")) == 0 { + zkPeers = []string{"localhost:2181"} + } else { + zkPeers = strings.Split(os.Getenv("ZOOKEEPER_PEERS"), ",") + } + + if len(os.Getenv("KAFKA_PEERS")) == 0 { + brokerPeers = []string{"localhost:9092"} + } else { + brokerPeers = strings.Split(os.Getenv("KAFKA_PEERS"), ",") + } + + k := &Kafka{ + ConsumerGroupName: "telegraf_test_consumers", + Topic: fmt.Sprintf("telegraf_test_topic_%d", time.Now().Unix()), + ZookeeperPeers: zkPeers, + } + + msg := "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257" + producer, err := sarama.NewSyncProducer(brokerPeers, nil) + require.NoError(t, err) + _, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: k.Topic, Value: sarama.StringEncoder(msg)}) + producer.Close() + + var acc testutil.Accumulator + + // Sanity check + assert.Equal(t, 0, len(acc.Points), "there should not be any points") + + err = k.Gather(&acc) + require.NoError(t, err) + + assert.Equal(t, 1, len(acc.Points), "there should be a single point") + + point := acc.Points[0] + assert.Equal(t, "cpu_load_short", point.Measurement) + assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values) + assert.Equal(t, map[string]string{ + "host": "server01", + "direction": "in", + "region": "us-west", + }, point.Tags) + assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time) +} diff --git a/plugins/kafka_consumer/kafka_consumer_test.go b/plugins/kafka_consumer/kafka_consumer_test.go new file mode 100644 index 0000000000000..fa6ad4a973f87 --- /dev/null +++ b/plugins/kafka_consumer/kafka_consumer_test.go @@ -0,0 +1,95 @@ +package kafka_consumer + +import ( + "strings" + "testing" + "time" + + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/Shopify/sarama.v1" +) + +const testMsg = "cpu_load_short,direction=in,host=server01,region=us-west value=23422.0 1422568543702900257" + +func TestReadFromKafkaBatchesMsgsOnBatchSize(t *testing.T) { + halt := make(chan bool, 1) + metricChan := make(chan []byte, 1) + kafkaChan := make(chan *sarama.ConsumerMessage, 10) + for i := 0; i < 10; i++ { + kafkaChan <- saramaMsg(testMsg) + } + + expectedBatch := strings.Repeat(testMsg+"\n", 9) + testMsg + readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error { + batch := <-metricChan + assert.Equal(t, expectedBatch, string(batch)) + + halt <- true + + return nil + }, halt) +} + +func TestReadFromKafkaBatchesMsgsOnTimeout(t *testing.T) { + halt := make(chan bool, 1) + metricChan := make(chan []byte, 1) + kafkaChan := make(chan *sarama.ConsumerMessage, 10) + for i := 0; i < 3; i++ { + kafkaChan <- saramaMsg(testMsg) + } + + expectedBatch := strings.Repeat(testMsg+"\n", 2) + testMsg + readFromKafka(kafkaChan, metricChan, 10, func(msg *sarama.ConsumerMessage) error { + batch := <-metricChan + assert.Equal(t, expectedBatch, string(batch)) + + halt <- true + + return nil + }, halt) +} + +func TestEmitMetricsSendMetricsToAcc(t *testing.T) { + k := &Kafka{} + var acc testutil.Accumulator + testChan := make(chan []byte, 1) + testChan <- []byte(testMsg) + + err := emitMetrics(k, &acc, testChan) + require.NoError(t, err) + + assert.Equal(t, 1, len(acc.Points), "there should be a single point") + + point := acc.Points[0] + assert.Equal(t, "cpu_load_short", point.Measurement) + assert.Equal(t, map[string]interface{}{"value": 23422.0}, point.Values) + assert.Equal(t, map[string]string{ + "host": "server01", + "direction": "in", + "region": "us-west", + }, point.Tags) + + assert.Equal(t, time.Unix(0, 1422568543702900257), point.Time) +} + +func TestEmitMetricsTimesOut(t *testing.T) { + k := &Kafka{} + var acc testutil.Accumulator + testChan := make(chan []byte) + + err := emitMetrics(k, &acc, testChan) + require.NoError(t, err) + + assert.Equal(t, 0, len(acc.Points), "there should not be a any points") +} + +func saramaMsg(val string) *sarama.ConsumerMessage { + return &sarama.ConsumerMessage{ + Key: nil, + Value: []byte(val), + Offset: 0, + Partition: 0, + } +}