-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMQP Consumer plugin #1678
Merged
Merged
AMQP Consumer plugin #1678
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
3f6a98f
Add rabbitmq consumer input plugin
jackzampolin 640b984
Add README.md to RabbitMQConsumer
jackzampolin c4e609c
Change from RabbitMQ -> AMQP
jackzampolin 863eea7
make batch size configurable
jackzampolin a321953
Fix varibale naming and add amqp_consumer to all/all.go
jackzampolin b7f217c
Add support for topic exchange to match amqp output
danielnelson d4693ee
Add queue name and reconnect support to amqp_consumer
danielnelson e30439b
Update changelog
danielnelson File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
# AMQP Consumer Input Plugin | ||
|
||
This plugin provides a consumer for use with AMQP 0-9-1, a promenent implementation of this protocol being [RabbitMQ](https://www.rabbitmq.com/). | ||
|
||
Metrics are read from a topic exchange using the configured queue and binding_key. | ||
|
||
Message payload should be formatted in one of the [Telegraf Data Formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md). | ||
|
||
For an introduction to AMQP see: | ||
- https://www.rabbitmq.com/tutorials/amqp-concepts.html | ||
- https://www.rabbitmq.com/getstarted.html | ||
|
||
The following defaults are known to work with RabbitMQ: | ||
|
||
```toml | ||
# AMQP consumer plugin | ||
[[inputs.amqp_consumer]] | ||
## AMQP url | ||
url = "amqp://localhost:5672/influxdb" | ||
## AMQP exchange | ||
exchange = "telegraf" | ||
## AMQP queue name | ||
queue = "telegraf" | ||
## Binding Key | ||
binding_key = "#" | ||
|
||
## Controls how many messages the server will try to keep on the network | ||
## for consumers before receiving delivery acks. | ||
#prefetch_count = 50 | ||
|
||
## Auth method. PLAIN and EXTERNAL are supported. | ||
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as | ||
## described here: https://www.rabbitmq.com/plugins.html | ||
# auth_method = "PLAIN" | ||
## Optional SSL Config | ||
# ssl_ca = "/etc/telegraf/ca.pem" | ||
# ssl_cert = "/etc/telegraf/cert.pem" | ||
# ssl_key = "/etc/telegraf/key.pem" | ||
## Use SSL but skip chain & host verification | ||
# insecure_skip_verify = false | ||
|
||
## Data format to output. | ||
## Each data format has it's own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md | ||
data_format = "influx" | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,280 @@ | ||
package amqp_consumer | ||
|
||
import ( | ||
"fmt" | ||
"log" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/streadway/amqp" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
"github.com/influxdata/telegraf/plugins/parsers" | ||
) | ||
|
||
// AMQPConsumer is the top level struct for this plugin | ||
type AMQPConsumer struct { | ||
URL string | ||
// AMQP exchange | ||
Exchange string | ||
// Queue Name | ||
Queue string | ||
// Binding Key | ||
BindingKey string `toml:"binding_key"` | ||
|
||
// Controls how many messages the server will try to keep on the network | ||
// for consumers before receiving delivery acks. | ||
PrefetchCount int | ||
|
||
// AMQP Auth method | ||
AuthMethod string | ||
// Path to CA file | ||
SSLCA string `toml:"ssl_ca"` | ||
// Path to host cert file | ||
SSLCert string `toml:"ssl_cert"` | ||
// Path to cert key file | ||
SSLKey string `toml:"ssl_key"` | ||
// Use SSL but skip chain & host verification | ||
InsecureSkipVerify bool | ||
|
||
parser parsers.Parser | ||
conn *amqp.Connection | ||
wg *sync.WaitGroup | ||
} | ||
|
||
type externalAuth struct{} | ||
|
||
func (a *externalAuth) Mechanism() string { | ||
return "EXTERNAL" | ||
} | ||
func (a *externalAuth) Response() string { | ||
return fmt.Sprintf("\000") | ||
} | ||
|
||
const ( | ||
DefaultAuthMethod = "PLAIN" | ||
DefaultPrefetchCount = 50 | ||
) | ||
|
||
func (a *AMQPConsumer) SampleConfig() string { | ||
return ` | ||
## AMQP url | ||
url = "amqp://localhost:5672/influxdb" | ||
## AMQP exchange | ||
exchange = "telegraf" | ||
## AMQP queue name | ||
queue = "telegraf" | ||
## Binding Key | ||
binding_key = "#" | ||
|
||
## Maximum number of messages server should give to the worker. | ||
prefetch_count = 50 | ||
|
||
## Auth method. PLAIN and EXTERNAL are supported | ||
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as | ||
## described here: https://www.rabbitmq.com/plugins.html | ||
# auth_method = "PLAIN" | ||
|
||
## Optional SSL Config | ||
# ssl_ca = "/etc/telegraf/ca.pem" | ||
# ssl_cert = "/etc/telegraf/cert.pem" | ||
# ssl_key = "/etc/telegraf/key.pem" | ||
## Use SSL but skip chain & host verification | ||
# insecure_skip_verify = false | ||
|
||
## Data format to output. | ||
## Each data format has it's own unique set of configuration options, read | ||
## more about them here: | ||
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md | ||
data_format = "influx" | ||
` | ||
} | ||
|
||
func (a *AMQPConsumer) Description() string { | ||
return "AMQP consumer plugin" | ||
} | ||
|
||
func (a *AMQPConsumer) SetParser(parser parsers.Parser) { | ||
a.parser = parser | ||
} | ||
|
||
// All gathering is done in the Start function | ||
func (a *AMQPConsumer) Gather(_ telegraf.Accumulator) error { | ||
return nil | ||
} | ||
|
||
func (a *AMQPConsumer) createConfig() (*amqp.Config, error) { | ||
// make new tls config | ||
tls, err := internal.GetTLSConfig( | ||
a.SSLCert, a.SSLKey, a.SSLCA, a.InsecureSkipVerify) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// parse auth method | ||
var sasl []amqp.Authentication // nil by default | ||
|
||
if strings.ToUpper(a.AuthMethod) == "EXTERNAL" { | ||
sasl = []amqp.Authentication{&externalAuth{}} | ||
} | ||
|
||
config := amqp.Config{ | ||
TLSClientConfig: tls, | ||
SASL: sasl, // if nil, it will be PLAIN | ||
} | ||
return &config, nil | ||
} | ||
|
||
// Start satisfies the telegraf.ServiceInput interface | ||
func (a *AMQPConsumer) Start(acc telegraf.Accumulator) error { | ||
amqpConf, err := a.createConfig() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
msgs, err := a.connect(amqpConf) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
a.wg = &sync.WaitGroup{} | ||
a.wg.Add(1) | ||
go a.process(msgs, acc) | ||
|
||
go func() { | ||
err := <-a.conn.NotifyClose(make(chan *amqp.Error)) | ||
if err == nil { | ||
return | ||
} | ||
|
||
log.Printf("I! AMQP consumer connection closed: %s; trying to reconnect", err) | ||
for { | ||
msgs, err := a.connect(amqpConf) | ||
if err != nil { | ||
log.Printf("E! AMQP connection failed: %s", err) | ||
time.Sleep(10 * time.Second) | ||
continue | ||
} | ||
|
||
a.wg.Add(1) | ||
go a.process(msgs, acc) | ||
break | ||
} | ||
}() | ||
|
||
return nil | ||
} | ||
|
||
func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, error) { | ||
conn, err := amqp.DialConfig(a.URL, *amqpConf) | ||
if err != nil { | ||
return nil, err | ||
} | ||
a.conn = conn | ||
|
||
ch, err := conn.Channel() | ||
if err != nil { | ||
return nil, fmt.Errorf("Failed to open a channel: %s", err) | ||
} | ||
|
||
err = ch.ExchangeDeclare( | ||
a.Exchange, // name | ||
"topic", // type | ||
true, // durable | ||
false, // auto-deleted | ||
false, // internal | ||
false, // no-wait | ||
nil, // arguments | ||
) | ||
if err != nil { | ||
return nil, fmt.Errorf("Failed to declare an exchange: %s", err) | ||
} | ||
|
||
q, err := ch.QueueDeclare( | ||
a.Queue, // queue | ||
true, // durable | ||
false, // delete when unused | ||
false, // exclusive | ||
false, // no-wait | ||
nil, // arguments | ||
) | ||
if err != nil { | ||
return nil, fmt.Errorf("Failed to declare a queue: %s", err) | ||
} | ||
|
||
err = ch.QueueBind( | ||
q.Name, // queue | ||
a.BindingKey, // binding-key | ||
a.Exchange, // exchange | ||
false, | ||
nil, | ||
) | ||
if err != nil { | ||
return nil, fmt.Errorf("Failed to bind a queue: %s", err) | ||
} | ||
|
||
err = ch.Qos( | ||
a.PrefetchCount, | ||
0, // prefetch-size | ||
false, // global | ||
) | ||
if err != nil { | ||
return nil, fmt.Errorf("Failed to set QoS: %s", err) | ||
} | ||
|
||
msgs, err := ch.Consume( | ||
q.Name, // queue | ||
"", // consumer | ||
false, // auto-ack | ||
false, // exclusive | ||
false, // no-local | ||
false, // no-wait | ||
nil, // arguments | ||
) | ||
if err != nil { | ||
return nil, fmt.Errorf("Failed establishing connection to queue: %s", err) | ||
} | ||
|
||
log.Println("I! Started AMQP consumer") | ||
return msgs, err | ||
} | ||
|
||
// Read messages from queue and add them to the Accumulator | ||
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { | ||
defer a.wg.Done() | ||
for d := range msgs { | ||
metrics, err := a.parser.Parse(d.Body) | ||
if err != nil { | ||
log.Printf("E! %v: error parsing metric - %v", err, string(d.Body)) | ||
} else { | ||
for _, m := range metrics { | ||
acc.AddFields(m.Name(), m.Fields(), m.Tags(), m.Time()) | ||
} | ||
} | ||
|
||
d.Ack(false) | ||
} | ||
log.Printf("I! AMQP consumer queue closed") | ||
} | ||
|
||
func (a *AMQPConsumer) Stop() { | ||
err := a.conn.Close() | ||
if err != nil && err != amqp.ErrClosed { | ||
log.Printf("E! Error closing AMQP connection: %s", err) | ||
return | ||
} | ||
a.wg.Wait() | ||
log.Println("I! Stopped AMQP service") | ||
} | ||
|
||
func init() { | ||
inputs.Add("amqp_consumer", func() telegraf.Input { | ||
return &AMQPConsumer{ | ||
AuthMethod: DefaultAuthMethod, | ||
PrefetchCount: DefaultPrefetchCount, | ||
} | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens here when the channel is closed? does the loop just cleanly exit? how does that happen?
I would have thought it'd start sending
nil
s?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"When the channel or connection closes, all delivery chans will also close. " - https://godoc.org/github.com/streadway/amqp#Channel.Consume