Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RabbitMQ cluster and running nodes count and running node status #3703

Merged
merged 4 commits into from
Jan 26, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions plugins/inputs/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- messages_ready (int, messages)
- messages_unacked (int, messages)
- queues (int, queues)
- cluster_nodes (int, member nodes)
- running_nodes (int, nodes up)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other metrics are noun_adjective, these new ones are adjective_noun

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, then I propose

cluster_nodes => nodes_member
running_nodes => nodes_running

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about clustering_listeners and amqp_listeners?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be 100% accurate. Agreed, let the user decide the practical meaning of those metrics then.


- rabbitmq_node
- disk_free (int, bytes)
Expand All @@ -74,6 +76,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- run_queue (int, erlang processes)
- sockets_total (int, sockets)
- sockets_used (int, sockets)
- running (int, node up)

- rabbitmq_queue
- consumer_utilisation (float, percent)
Expand Down Expand Up @@ -144,7 +147,7 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m)

```
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i,cluster_nodes=2i,running_nodes=1i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i,running=1i 149368403500000000
rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000
```
28 changes: 26 additions & 2 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type OverviewResponse struct {
MessageStats *MessageStats `json:"message_stats"`
ObjectTotals *ObjectTotals `json:"object_totals"`
QueueTotals *QueueTotals `json:"queue_totals"`
Listeners []Listeners `json:"listeners"`
}

// Listeners ...
type Listeners struct {
Protocol string `json:"protocol"`
}

// Details ...
Expand Down Expand Up @@ -134,6 +140,7 @@ type Node struct {
RunQueue int64 `json:"run_queue"`
SocketsTotal int64 `json:"sockets_total"`
SocketsUsed int64 `json:"sockets_used"`
Running bool `json:"running"`
}

type Exchange struct {
Expand Down Expand Up @@ -186,7 +193,7 @@ var sampleConfig = `
# queues = ["telegraf"]

## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
## specified, metrics for all exchanges are gathered.
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]
`

Expand Down Expand Up @@ -275,11 +282,20 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
return
}

if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil {
if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil || overview.Listeners == nil {
acc.AddError(fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue"))
return
}

var cluster_nodes, running_nodes int64 = 0, 0
for _, listener := range overview.Listeners {
if listener.Protocol == "clustering" {
cluster_nodes++
} else if listener.Protocol == "amqp" {
running_nodes++
}
}

tags := map[string]string{"url": r.URL}
if r.Name != "" {
tags["name"] = r.Name
Expand All @@ -297,6 +313,8 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) {
"messages_delivered": overview.MessageStats.Deliver,
"messages_delivered_get": overview.MessageStats.DeliverGet,
"messages_published": overview.MessageStats.Publish,
"cluster_nodes": cluster_nodes,
"running_nodes": running_nodes,
}
acc.AddFields("rabbitmq_overview", fields, tags)
}
Expand All @@ -319,6 +337,11 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
tags := map[string]string{"url": r.URL}
tags["node"] = node.Name

var running int64 = 0
if node.Running {
running = 1
}

fields := map[string]interface{}{
"disk_free": node.DiskFree,
"disk_free_limit": node.DiskFreeLimit,
Expand All @@ -331,6 +354,7 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) {
"run_queue": node.RunQueue,
"sockets_total": node.SocketsTotal,
"sockets_used": node.SocketsUsed,
"running": running,
}
acc.AddFields("rabbitmq_node", fields, tags, now)
}
Expand Down
23 changes: 22 additions & 1 deletion plugins/inputs/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,25 @@ const sampleOverviewResponse = `
"messages_unacknowledged_details": {
"rate": 0.0
}
}
},
"listeners": [
{
"name": "rabbit@node-a",
"protocol": "amqp"
},
{
"name": "rabbit@node-b",
"protocol": "amqp"
},
{
"name": "rabbit@node-a",
"protocol": "clustering"
},
{
"name": "rabbit@node-b",
"protocol": "clustering"
}
]
}
`

Expand Down Expand Up @@ -514,6 +532,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
"consumers",
"exchanges",
"queues",
"cluster_nodes",
"running_nodes",
}

for _, metric := range intMetrics {
Expand All @@ -532,6 +552,7 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
"run_queue",
"sockets_total",
"sockets_used",
"running",
}

for _, metric := range nodeIntMetrics {
Expand Down