Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RabbitMQ cluster and running nodes count and running node status #3703

Merged
merged 4 commits into from
Jan 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions plugins/inputs/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- messages_ready (int, messages)
- messages_unacked (int, messages)
- queues (int, queues)
- clustering_listeners (int, cluster nodes)
- amqp_listeners (int, amqp nodes up)

- rabbitmq_node
- disk_free (int, bytes)
Expand All @@ -74,6 +76,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- run_queue (int, erlang processes)
- sockets_total (int, sockets)
- sockets_used (int, sockets)
- running (int, node up)

- rabbitmq_queue
- consumer_utilisation (float, percent)
Expand Down Expand Up @@ -144,7 +147,7 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m)

```
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i,clustering_listeners=2i,amqp_listeners=1i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i,running=1i 149368403500000000
rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000
```
28 changes: 26 additions & 2 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type OverviewResponse struct {
MessageStats *MessageStats `json:"message_stats"`
ObjectTotals *ObjectTotals `json:"object_totals"`
QueueTotals *QueueTotals `json:"queue_totals"`
Listeners []Listeners `json:"listeners"`
}

// Listeners ...
type Listeners struct {
Protocol string `json:"protocol"`
}

// Details ...
Expand Down Expand Up @@ -134,6 +140,7 @@ type Node struct {
RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
}

type Exchange struct {
Expand Down Expand Up @@ -186,7 +193,7 @@ var sampleConfig = `
# queues = ["telegraf"]

## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
## specified, metrics for all exchanges are gathered.
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]
`

Expand Down Expand Up @@ -275,11 +282,20 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
return
}

if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil {
if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil || overview.Listeners == nil {
acc.AddError(fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue"))
return
}

var clustering_listeners, amqp_listeners int64 = 0, 0
for _, listener := range overview.Listeners {
if listener.Protocol == "clustering" {
clustering_listeners++
} else if listener.Protocol == "amqp" {
amqp_listeners++
}
}

tags := map[string]string{"url": r.URL}
if r.Name != "" {
tags["name"] = r.Name
Expand All @@ -297,6 +313,8 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
"messages_delivered": overview.MessageStats.Deliver,
"messages_delivered_get": overview.MessageStats.DeliverGet,
"messages_published": overview.MessageStats.Publish,
"clustering_listeners": clustering_listeners,
"amqp_listeners": amqp_listeners,
}
acc.AddFields("rabbitmq_overview", fields, tags)
}
Expand All @@ -319,6 +337,11 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
tags := map[string]string{"url": r.URL}
tags["node"] = node.Name

var running int64 = 0
if node.Running {
running = 1
}

fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
Expand All @@ -331,6 +354,7 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"running": running,
}
acc.AddFields("rabbitmq_node", fields, tags, now)
}
Expand Down
23 changes: 22 additions & 1 deletion plugins/inputs/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,25 @@ const sampleOverviewResponse = `
"messages_unacknowledged_details": {
"rate": 0.0
}
}
},
"listeners": [
{
"name": "rabbit@node-a",
"protocol": "amqp"
},
{
"name": "rabbit@node-b",
"protocol": "amqp"
},
{
"name": "rabbit@node-a",
"protocol": "clustering"
},
{
"name": "rabbit@node-b",
"protocol": "clustering"
}
]
}
`

Expand Down Expand Up @@ -514,6 +532,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
"consumers",
"exchanges",
"queues",
"clustering_listeners",
"amqp_listeners",
}

for _, metric := range intMetrics {
Expand All @@ -532,6 +552,7 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
"run_queue",
"sockets_total",
"sockets_used",
"running",
}

for _, metric := range nodeIntMetrics {
Expand Down