-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix kafka plugin and rename to kafka_consumer
fixes #371
- Loading branch information
Showing
10 changed files
with
455 additions
and
431 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,164 +1,166 @@ | ||
package kafka_consumer | ||
|
||
import ( | ||
"os" | ||
"os/signal" | ||
"time" | ||
"log" | ||
"strings" | ||
"sync" | ||
|
||
"github.com/Shopify/sarama" | ||
"github.com/influxdb/influxdb/models" | ||
"github.com/influxdb/telegraf/plugins" | ||
|
||
"github.com/Shopify/sarama" | ||
"github.com/wvanbergen/kafka/consumergroup" | ||
) | ||
|
||
type Kafka struct { | ||
ConsumerGroupName string | ||
Topic string | ||
ZookeeperPeers []string | ||
Consumer *consumergroup.ConsumerGroup | ||
BatchSize int | ||
ConsumerGroup string | ||
Topics []string | ||
ZookeeperPeers []string | ||
Consumer *consumergroup.ConsumerGroup | ||
PointBuffer int | ||
Offset string | ||
|
||
sync.Mutex | ||
|
||
// channel for all incoming kafka messages | ||
in <-chan *sarama.ConsumerMessage | ||
// channel for all kafka consumer errors | ||
errs <-chan *sarama.ConsumerError | ||
// channel for all incoming parsed kafka points | ||
pointChan chan models.Point | ||
done chan struct{} | ||
|
||
// doNotCommitMsgs tells the parser not to call CommitUpTo on the consumer | ||
// this is mostly for test purposes, but there may be a use-case for it later. | ||
doNotCommitMsgs bool | ||
} | ||
|
||
var sampleConfig = ` | ||
# topic to consume | ||
topic = "topic_with_metrics" | ||
# the name of the consumer group | ||
consumerGroupName = "telegraf_metrics_consumers" | ||
# topic(s) to consume | ||
topics = ["telegraf"] | ||
# an array of Zookeeper connection strings | ||
zookeeperPeers = ["localhost:2181"] | ||
# Batch size of points sent to InfluxDB | ||
batchSize = 1000 | ||
zookeeper_peers = ["localhost:2181"] | ||
# the name of the consumer group | ||
consumer_group = "telegraf_metrics_consumers" | ||
# Maximum number of points to buffer between collection intervals | ||
point_buffer = 100000 | ||
# Offset (must be either "oldest" or "newest") | ||
offset = "oldest" | ||
` | ||
|
||
func (k *Kafka) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (k *Kafka) Description() string { | ||
return "read metrics from a Kafka topic" | ||
} | ||
|
||
type Metric struct { | ||
Measurement string `json:"measurement"` | ||
Values map[string]interface{} `json:"values"` | ||
Tags map[string]string `json:"tags"` | ||
Time time.Time `json:"time"` | ||
return "Read line-protocol metrics from Kafka topic(s)" | ||
} | ||
|
||
func (k *Kafka) Gather(acc plugins.Accumulator) error { | ||
func (k *Kafka) Start() error { | ||
k.Lock() | ||
defer k.Unlock() | ||
var consumerErr error | ||
metricQueue := make(chan []byte, 200) | ||
|
||
if k.Consumer == nil { | ||
config := consumergroup.NewConfig() | ||
switch strings.ToLower(k.Offset) { | ||
case "oldest", "": | ||
config.Offsets.Initial = sarama.OffsetOldest | ||
case "newest": | ||
config.Offsets.Initial = sarama.OffsetNewest | ||
default: | ||
log.Printf("WARNING: Kafka consumer invalid offset '%s', using 'oldest'\n", | ||
k.Offset) | ||
config.Offsets.Initial = sarama.OffsetOldest | ||
} | ||
|
||
if k.Consumer == nil || k.Consumer.Closed() { | ||
k.Consumer, consumerErr = consumergroup.JoinConsumerGroup( | ||
k.ConsumerGroupName, | ||
[]string{k.Topic}, | ||
k.ConsumerGroup, | ||
k.Topics, | ||
k.ZookeeperPeers, | ||
nil, | ||
config, | ||
) | ||
|
||
if consumerErr != nil { | ||
return consumerErr | ||
} | ||
|
||
c := make(chan os.Signal, 1) | ||
halt := make(chan bool, 1) | ||
signal.Notify(c, os.Interrupt) | ||
go func() { | ||
<-c | ||
halt <- true | ||
emitMetrics(k, acc, metricQueue) | ||
k.Consumer.Close() | ||
}() | ||
|
||
go readFromKafka(k.Consumer.Messages(), | ||
metricQueue, | ||
k.BatchSize, | ||
k.Consumer.CommitUpto, | ||
halt) | ||
// Setup message and error channels | ||
k.in = k.Consumer.Messages() | ||
k.errs = k.Consumer.Errors() | ||
} | ||
|
||
return emitMetrics(k, acc, metricQueue) | ||
} | ||
k.done = make(chan struct{}) | ||
if k.PointBuffer == 0 { | ||
k.PointBuffer = 100000 | ||
} | ||
k.pointChan = make(chan models.Point, k.PointBuffer) | ||
|
||
func emitMetrics(k *Kafka, acc plugins.Accumulator, metricConsumer <-chan []byte) error { | ||
timeout := time.After(1 * time.Second) | ||
// Start the kafka message reader | ||
go k.parser() | ||
log.Printf("Started the kafka consumer service, peers: %v, topics: %v\n", | ||
k.ZookeeperPeers, k.Topics) | ||
return nil | ||
} | ||
|
||
// parser() reads all incoming messages from the consumer, and parses them into | ||
// influxdb metric points. | ||
func (k *Kafka) parser() { | ||
for { | ||
select { | ||
case batch := <-metricConsumer: | ||
var points []models.Point | ||
var err error | ||
if points, err = models.ParsePoints(batch); err != nil { | ||
return err | ||
case <-k.done: | ||
return | ||
case err := <-k.errs: | ||
log.Printf("Kafka Consumer Error: %s\n", err.Error()) | ||
case msg := <-k.in: | ||
points, err := models.ParsePoints(msg.Value) | ||
if err != nil { | ||
log.Printf("Could not parse kafka message: %s, error: %s", | ||
string(msg.Value), err.Error()) | ||
} | ||
|
||
for _, point := range points { | ||
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) | ||
select { | ||
case k.pointChan <- point: | ||
continue | ||
default: | ||
log.Printf("Kafka Consumer buffer is full, dropping a point." + | ||
" You may want to increase the point_buffer setting") | ||
} | ||
} | ||
|
||
if !k.doNotCommitMsgs { | ||
// TODO(cam) this locking can be removed if this PR gets merged: | ||
// https://github.com/wvanbergen/kafka/pull/84 | ||
k.Lock() | ||
k.Consumer.CommitUpto(msg) | ||
k.Unlock() | ||
} | ||
case <-timeout: | ||
return nil | ||
} | ||
} | ||
} | ||
|
||
const millisecond = 1000000 * time.Nanosecond | ||
|
||
type ack func(*sarama.ConsumerMessage) error | ||
|
||
func readFromKafka( | ||
kafkaMsgs <-chan *sarama.ConsumerMessage, | ||
metricProducer chan<- []byte, | ||
maxBatchSize int, | ||
ackMsg ack, | ||
halt <-chan bool, | ||
) { | ||
batch := make([]byte, 0) | ||
currentBatchSize := 0 | ||
timeout := time.After(500 * millisecond) | ||
var msg *sarama.ConsumerMessage | ||
|
||
for { | ||
select { | ||
case msg = <-kafkaMsgs: | ||
if currentBatchSize != 0 { | ||
batch = append(batch, '\n') | ||
} | ||
|
||
batch = append(batch, msg.Value...) | ||
currentBatchSize++ | ||
|
||
if currentBatchSize == maxBatchSize { | ||
metricProducer <- batch | ||
currentBatchSize = 0 | ||
batch = make([]byte, 0) | ||
ackMsg(msg) | ||
} | ||
case <-timeout: | ||
if currentBatchSize != 0 { | ||
metricProducer <- batch | ||
currentBatchSize = 0 | ||
batch = make([]byte, 0) | ||
ackMsg(msg) | ||
} | ||
|
||
timeout = time.After(500 * millisecond) | ||
case <-halt: | ||
if currentBatchSize != 0 { | ||
metricProducer <- batch | ||
ackMsg(msg) | ||
} | ||
func (k *Kafka) Stop() { | ||
k.Lock() | ||
defer k.Unlock() | ||
close(k.done) | ||
if err := k.Consumer.Close(); err != nil { | ||
log.Printf("Error closing kafka consumer: %s\n", err.Error()) | ||
} | ||
} | ||
|
||
return | ||
} | ||
func (k *Kafka) Gather(acc plugins.Accumulator) error { | ||
k.Lock() | ||
defer k.Unlock() | ||
npoints := len(k.pointChan) | ||
for i := 0; i < npoints; i++ { | ||
point := <-k.pointChan | ||
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) | ||
} | ||
return nil | ||
} | ||
|
||
func init() { | ||
plugins.Add("kafka", func() plugins.Plugin { | ||
plugins.Add("kafka_consumer", func() plugins.Plugin { | ||
return &Kafka{} | ||
}) | ||
} |
Oops, something went wrong.