-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AMQP Consumer plugin #1678
AMQP Consumer plugin #1678
Conversation
411e325
to
79f071b
Compare
Really looking forward to this PR so I can get the data back out of Rabbit |
@sparrc Any way we could get this in |
Sorry bur I think 1.2 is too soon as I haven't reviewed this yet and there is a large backlog. We can try for 1.3. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments in review.
One other thing: I think we might want to change the AMQP output plugin to default to an exchange of ""
. We should also verify that both plugins can talk to each other with their default configurations.
func (rmq *AMQPConsumer) SampleConfig() string { | ||
return ` | ||
# The following options form a connection string to amqp: | ||
# amqp://{username}:{password}@{amqp_host}:{amqp_port} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this so that users just need to specify they're server in this format
amqp_port = "5672" | ||
# name of the queue to consume from | ||
queue = "task_queue" | ||
prefetch = 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spacing and what does this option do?
} | ||
|
||
// listen(s) for new messages coming in from AMQP and pawns them off to handleMessage | ||
func (rmq *AMQPConsumer) listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably wait for this function to exit when calling Stop()
func (rmq *AMQPConsumer) registerConsumer() <-chan amqp.Delivery { | ||
messages, err := rmq.ch.Consume(rmq.Queue, "", false, false, false, false, nil) | ||
if err != nil { | ||
panic(fmt.Errorf("%v: failed establishing connection to queue", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't panic
// listen(s) for new messages coming in from AMQP and pawns them off to handleMessage | ||
func (rmq *AMQPConsumer) listen(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { | ||
for d := range msgs { | ||
go handleMessage(d, acc, rmq.parser) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a go-routine per message is not a good idea, this will be too much in high-traffic scenarios. Would be better to just handle messages serially.
func handleMessage(d amqp.Delivery, acc telegraf.Accumulator, parser parsers.Parser) { | ||
metric, err := parser.Parse(d.Body) | ||
if err != nil { | ||
log.Fatalf("%v: error parsing metric - %v", err, string(d.Body)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont use log.Fatal, this should be a log.Print and the message should start with E! ...
@@ -0,0 +1,142 @@ | |||
package rabbitmqConsumer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are there two of the same plugin? couldn't we only do an amqp consumer?
queue = "task_queue" | ||
prefetch = 1000 | ||
|
||
data_format = "influx" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should put the standard comment in here that is in other plugins
last thing is that we should probably mention in the documentation, default config, and README (including the main repo README) that AMQP == RabbitMQ. |
cde1a7c
to
b7f217c
Compare
I added support for a topic exchange, since that is what the output plugin uses, as well as the authentication options that the output plugin has (custom TLS and SASL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor changes in formatting and such.
plugins/outputs/amqp/amqp.go
Outdated
@@ -205,6 +209,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { | |||
Headers: q.headers, | |||
ContentType: "text/plain", | |||
Body: buf, | |||
// DeliveryMode: amqp.Persistent, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this comment be removed?
nil, // arguments | ||
) | ||
if err != nil { | ||
return fmt.Errorf("Failed to declare an exchange: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error message is inconsistent with the rest. Should be:
return fmt.Errorf("%v: Failed to declare an exchange", err)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually use %s
for errors, is there a benefit to using %v
? if err != nil then the string format will always work properly, I believe it will just print whatever err.Error() returns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll stick to the current convention of "Failed to blah blah: %s"
a.Prefetch, | ||
0, // prefetch-size | ||
false, // global | ||
) | ||
if err != nil { | ||
return fmt.Errorf("%v: failed to set Qos", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be QoS
in error message.
plugins/outputs/amqp/amqp.go
Outdated
@@ -160,7 +163,8 @@ func (q *AMQP) Connect() error { | |||
} | |||
|
|||
func (q *AMQP) Close() error { | |||
return q.channel.Close() | |||
q.channel.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check for close error and return
plugins/outputs/amqp/amqp.go
Outdated
@@ -205,6 +209,7 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error { | |||
Headers: q.headers, | |||
ContentType: "text/plain", | |||
Body: buf, | |||
// DeliveryMode: amqp.Persistent, | |||
}) | |||
if err != nil { | |||
return fmt.Errorf("FAILED to send amqp message: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error formatting inconsistent here are compared to others
@@ -129,6 +130,8 @@ func (q *AMQP) Connect() error { | |||
if err != nil { | |||
return err | |||
} | |||
q.conn = connection | |||
|
|||
channel, err := connection.Channel() | |||
if err != nil { | |||
return fmt.Errorf("Failed to open a channel: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message inconsistent here as compared to others.
## Binding Key | ||
binding_key = "#" | ||
|
||
## Maximum number of messages server should give to the worker. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would expand on this a bit, using language directly from either the streadway AMQP library or from the RabbitMQ documentation.
You can also link to external documentation here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(especially mention that this is a channel QoS setting)
@@ -0,0 +1,35 @@ | |||
# AMQP Consumer Input Plugin | |||
|
|||
This plugin reads data from an AMQP Queue ([RabbitMQ](https://www.rabbitmq.com/) being an example) formatted in one of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I think we should mention that this plugin is for the AMQP 0.9.1 protocol, and link to the RabbitMQ doc about it: https://www.rabbitmq.com/amqp-0-9-1-reference.html & https://www.rabbitmq.com/tutorials/amqp-concepts.html
|
||
// Declare a queue and assign it to AMQPConsumer | ||
q, err := ch.QueueDeclare( | ||
"telegraf", // queue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should the queue name be configurable? what if a user wanted to consume from two different telegraf instances?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added support for this.
url = "amqp://localhost:5672/influxdb" | ||
## AMQP exchange | ||
exchange = "telegraf" | ||
## Auth method. PLAIN and EXTERNAL are supported |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does EXTERNAL require the AMQP instance to have a specific plugin enabled? could you link to a doc about this? (I could be wrong, but https://www.rabbitmq.com/authentication.html seems to suggest it requires a plugin)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is required, I added a link in the latest push.
|
||
The following defaults are set to work with RabbitMQ: | ||
|
||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
put toml
after the ```, like:
```toml
...
this will tell github to colorize the readme
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { | ||
defer a.wg.Done() | ||
for d := range msgs { | ||
metric, err := a.parser.Parse(d.Body) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: metric
-> metrics
|
||
func (a *AMQPConsumer) Stop() { | ||
a.Lock() | ||
defer a.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we call a.ch.Cancel()
? https://godoc.org/github.com/streadway/amqp#Channel.Cancel
Could be no, TBH I don't really understand the difference between "Channel.Cancel" and "Channel.Close"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, there's probably nothing to change here, this example code from the library itself simply closes the connection, which "takes the channel with it": https://github.com/streadway/amqp/blob/master/_examples/pubsub/pubsub.go#L37
(note that the session
struct in that example has an s.Channel
object defined, but doesn't get used in the Close() func)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and with that being said, another example code calls Cancel and then Close: https://github.com/streadway/amqp/blob/master/_examples/simple-consumer/consumer.go#L141
but it calls Cancel with noWait=true, which is completely pointless AFAICT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it only matters if you will use multiple consumers on a single channel. We should be able to just close the connection as well.
// Read messages from queue and add them to the Accumulator | ||
func (a *AMQPConsumer) process(msgs <-chan amqp.Delivery, acc telegraf.Accumulator) { | ||
defer a.wg.Done() | ||
for d := range msgs { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens here when the channel is closed? does the loop just cleanly exit? how does that happen?
I would have thought it'd start sending nil
s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"When the channel or connection closes, all delivery chans will also close. " - https://godoc.org/github.com/streadway/amqp#Channel.Consume
I believe all the concerns above are addressed, and additionally I added support for reconnecting. So there are quite a few changes, can you re-review? |
205d328
to
d2c903d
Compare
d2c903d
to
d4693ee
Compare
LGTM, nice work! |
Required for all PRs: