diff --git a/plugins/inputs/rabbitmq/README.md b/plugins/inputs/rabbitmq/README.md index a1dfc879a605d..796dfc7bfe994 100644 --- a/plugins/inputs/rabbitmq/README.md +++ b/plugins/inputs/rabbitmq/README.md @@ -61,6 +61,8 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd - messages_ready (int, messages) - messages_unacked (int, messages) - queues (int, queues) + - clustering_listeners (int, cluster nodes) + - amqp_listeners (int, amqp nodes up) - rabbitmq_node - disk_free (int, bytes) @@ -74,6 +76,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd - run_queue (int, erlang processes) - sockets_total (int, sockets) - sockets_used (int, sockets) + - running (int, node up) - rabbitmq_queue - consumer_utilisation (float, percent) @@ -144,7 +147,7 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m) ``` rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000 -rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000 -rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000 +rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i,clustering_listeners=2i,amqp_listeners=1i 1493684035000000000 +rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i,running=1i 149368403500000000 rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000 ``` diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 775ea75dfa460..64aa6408de78c 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -60,6 +60,12 @@ type OverviewResponse struct { MessageStats *MessageStats `json:"message_stats"` ObjectTotals *ObjectTotals `json:"object_totals"` QueueTotals *QueueTotals `json:"queue_totals"` + Listeners []Listeners `json:"listeners"` +} + +// Listeners ... +type Listeners struct { + Protocol string `json:"protocol"` } // Details ... @@ -134,6 +140,7 @@ type Node struct { RunQueue int64 `json:"run_queue"` SocketsTotal int64 `json:"sockets_total"` SocketsUsed int64 `json:"sockets_used"` + Running bool `json:"running"` } type Exchange struct { @@ -186,7 +193,7 @@ var sampleConfig = ` # queues = ["telegraf"] ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not - ## specified, metrics for all exchanges are gathered. + ## specified, metrics for all exchanges are gathered. # exchanges = ["telegraf"] ` @@ -275,11 +282,20 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) { return } - if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil { + if overview.QueueTotals == nil || overview.ObjectTotals == nil || overview.MessageStats == nil || overview.Listeners == nil { acc.AddError(fmt.Errorf("Wrong answer from rabbitmq. Probably auth issue")) return } + var clustering_listeners, amqp_listeners int64 = 0, 0 + for _, listener := range overview.Listeners { + if listener.Protocol == "clustering" { + clustering_listeners++ + } else if listener.Protocol == "amqp" { + amqp_listeners++ + } + } + tags := map[string]string{"url": r.URL} if r.Name != "" { tags["name"] = r.Name @@ -297,6 +313,8 @@ func gatherOverview(r *RabbitMQ, acc telegraf.Accumulator) { "messages_delivered": overview.MessageStats.Deliver, "messages_delivered_get": overview.MessageStats.DeliverGet, "messages_published": overview.MessageStats.Publish, + "clustering_listeners": clustering_listeners, + "amqp_listeners": amqp_listeners, } acc.AddFields("rabbitmq_overview", fields, tags) } @@ -319,6 +337,11 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) { tags := map[string]string{"url": r.URL} tags["node"] = node.Name + var running int64 = 0 + if node.Running { + running = 1 + } + fields := map[string]interface{}{ "disk_free": node.DiskFree, "disk_free_limit": node.DiskFreeLimit, @@ -331,6 +354,7 @@ func gatherNodes(r *RabbitMQ, acc telegraf.Accumulator) { "run_queue": node.RunQueue, "sockets_total": node.SocketsTotal, "sockets_used": node.SocketsUsed, + "running": running, } acc.AddFields("rabbitmq_node", fields, tags, now) } diff --git a/plugins/inputs/rabbitmq/rabbitmq_test.go b/plugins/inputs/rabbitmq/rabbitmq_test.go index 759c71c4175b4..5e9829cc00d28 100644 --- a/plugins/inputs/rabbitmq/rabbitmq_test.go +++ b/plugins/inputs/rabbitmq/rabbitmq_test.go @@ -51,7 +51,25 @@ const sampleOverviewResponse = ` "messages_unacknowledged_details": { "rate": 0.0 } - } + }, + "listeners": [ + { + "name": "rabbit@node-a", + "protocol": "amqp" + }, + { + "name": "rabbit@node-b", + "protocol": "amqp" + }, + { + "name": "rabbit@node-a", + "protocol": "clustering" + }, + { + "name": "rabbit@node-b", + "protocol": "clustering" + } + ] } ` @@ -514,6 +532,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { "consumers", "exchanges", "queues", + "clustering_listeners", + "amqp_listeners", } for _, metric := range intMetrics { @@ -532,6 +552,7 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { "run_queue", "sockets_total", "sockets_used", + "running", } for _, metric := range nodeIntMetrics {