Skip to content

Commit

Permalink
Add RabbitMQ cluster and running nodes count and running node status (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
askainet authored and danielnelson committed Jan 26, 2018
1 parent ff63421 commit 782b133
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 5 deletions.
7 changes: 5 additions & 2 deletions plugins/inputs/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- messages_ready (int, messages)
- messages_unacked (int, messages)
- queues (int, queues)
- clustering_listeners (int, cluster nodes)
- amqp_listeners (int, amqp nodes up)

- rabbitmq_node
- disk_free (int, bytes)
Expand All @@ -74,6 +76,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- run_queue (int, erlang processes)
- sockets_total (int, sockets)
- sockets_used (int, sockets)
- running (int, node up)

- rabbitmq_queue
- consumer_utilisation (float, percent)
Expand Down Expand Up @@ -144,7 +147,7 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m)

```
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i,clustering_listeners=2i,amqp_listeners=1i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i,running=1i 149368403500000000
rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000
```
28 changes: 26 additions & 2 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type OverviewResponse struct {
MessageStats *MessageStats `json:"message_stats"`
ObjectTotals *ObjectTotals `json:"object_totals"`
QueueTotals *QueueTotals `json:"queue_totals"`
Listeners []Listeners `json:"listeners"`
}

// Listeners ...
type Listeners struct {
Protocol string `json:"protocol"`
}

// Details ...
Expand Down Expand Up @@ -134,6 +140,7 @@ type Node struct {
RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
}

type Exchange struct {
Expand Down Expand Up @@ -186,7 +193,7 @@ var sampleConfig = `
# queues = ["telegraf"]
## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
## specified, metrics for all exchanges are gathered.
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]
`

Expand Down Expand Up @@ -275,11 +282,20 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
return
}

if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil {
if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil || overview.Listeners == nil {
acc.AddError(fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue"))
return
}

var clustering_listeners, amqp_listeners int64 = 0, 0
for _, listener := range overview.Listeners {
if listener.Protocol == "clustering" {
clustering_listeners++
} else if listener.Protocol == "amqp" {
amqp_listeners++
}
}

tags := map[string]string{"url": r.URL}
if r.Name != "" {
tags["name"] = r.Name
Expand All @@ -297,6 +313,8 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
"messages_delivered": overview.MessageStats.Deliver,
"messages_delivered_get": overview.MessageStats.DeliverGet,
"messages_published": overview.MessageStats.Publish,
"clustering_listeners": clustering_listeners,
"amqp_listeners": amqp_listeners,
}
acc.AddFields("rabbitmq_overview", fields, tags)
}
Expand All @@ -319,6 +337,11 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
tags := map[string]string{"url": r.URL}
tags["node"] = node.Name

var running int64 = 0
if node.Running {
running = 1
}

fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
Expand All @@ -331,6 +354,7 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"running": running,
}
acc.AddFields("rabbitmq_node", fields, tags, now)
}
Expand Down
23 changes: 22 additions & 1 deletion plugins/inputs/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,25 @@ const sampleOverviewResponse = `
"messages_unacknowledged_details": {
"rate": 0.0
}
}
},
"listeners": [
{
"name": "rabbit@node-a",
"protocol": "amqp"
},
{
"name": "rabbit@node-b",
"protocol": "amqp"
},
{
"name": "rabbit@node-a",
"protocol": "clustering"
},
{
"name": "rabbit@node-b",
"protocol": "clustering"
}
]
}
`

Expand Down Expand Up @@ -514,6 +532,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
"consumers",
"exchanges",
"queues",
"clustering_listeners",
"amqp_listeners",
}

for _, metric := range intMetrics {
Expand All @@ -532,6 +552,7 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
"run_queue",
"sockets_total",
"sockets_used",
"running",
}

for _, metric := range nodeIntMetrics {
Expand Down

0 comments on commit 782b133

Please sign in to comment.