Skip to content

Commit

Permalink
Rabbitmq exchanges metricset (#6955)
Browse files Browse the repository at this point in the history
Continues with #6607
  • Loading branch information
jsoriano authored and ruflin committed May 8, 2018
1 parent a57a0cb commit c522fd1
Show file tree
Hide file tree
Showing 18 changed files with 516 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Add experimental Elasticsearch index metricset. {pull}6881[6881]
- Add dashboards and visualizations for haproxy metrics. {pull}6934[6934]
- Add message rates to the RabbitMQ queue metricset {issue}6442[6442] {pull}6606[6606]
- Add exchanges metricset to the RabbitMQ module {issue}6442[6442] {pull}6607[6607]

*Packetbeat*

Expand Down
107 changes: 107 additions & 0 deletions metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10912,6 +10912,113 @@ type: long
Number of octets received on the connection.
--
[float]
== exchange fields
exchange
*`rabbitmq.exchange.name`*::
+
--
type: keyword
The name of the queue with non-ASCII characters escaped as in C.
--
*`rabbitmq.exchange.vhost`*::
+
--
type: keyword
Virtual host name with non-ASCII characters escaped as in C.
--
*`rabbitmq.exchange.durable`*::
+
--
type: boolean
Whether or not the queue survives server restarts.
--
*`rabbitmq.exchange.auto_delete`*::
+
--
type: boolean
Whether the queue will be deleted automatically when no longer used.
--
*`rabbitmq.exchange.internal`*::
+
--
type: boolean
Whether the exchange is internal, i.e. cannot be directly published to by a client.
--
*`rabbitmq.exchange.user`*::
+
--
type: keyword
User who created the exchange.
--
*`rabbitmq.exchange.messages.publish_in.count`*::
+
--
type: long
Count of messages published "in" to an exchange, i.e. not taking account of routing.
--
*`rabbitmq.exchange.messages.publish_in.details.rate`*::
+
--
type: float
How much the exchange publish-in count has changed per second in the most recent sampling interval.
--
*`rabbitmq.exchange.messages.publish_out.count`*::
+
--
type: long
Count of messages published "out" of an exchange, i.e. taking account of routing.
--
*`rabbitmq.exchange.messages.publish_out.details.rate`*::
+
--
type: float
How much the exchange publish-out count has changed per second in the most recent sampling interval.
--
[float]
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/docs/modules/rabbitmq.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ The following metricsets are available:

* <<metricbeat-metricset-rabbitmq-connection,connection>>

* <<metricbeat-metricset-rabbitmq-exchange,exchange>>

* <<metricbeat-metricset-rabbitmq-node,node>>

* <<metricbeat-metricset-rabbitmq-queue,queue>>

include::rabbitmq/connection.asciidoc[]

include::rabbitmq/exchange.asciidoc[]

include::rabbitmq/node.asciidoc[]

include::rabbitmq/queue.asciidoc[]
Expand Down
23 changes: 23 additions & 0 deletions metricbeat/docs/modules/rabbitmq/exchange.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
////
This file is generated! See scripts/docs_collector.py
////

[[metricbeat-metricset-rabbitmq-exchange]]
=== RabbitMQ exchange metricset

beta[]

include::../../../module/rabbitmq/exchange/_meta/docs.asciidoc[]


==== Fields

For a description of each field in the metricset, see the
<<exported-fields-rabbitmq,exported fields>> section.

Here is an example document generated by this metricset:

[source,json]
----
include::../../../module/rabbitmq/exchange/_meta/data.json[]
----
3 changes: 2 additions & 1 deletion metricbeat/docs/modules_list.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ This file is generated! See scripts/docs_collector.py
.2+| .2+| |<<metricbeat-metricset-prometheus-collector,collector>> beta[]
|<<metricbeat-metricset-prometheus-stats,stats>> beta[]
|<<metricbeat-module-rabbitmq,RabbitMQ>> beta[] |image:./images/icon-yes.png[Prebuilt dashboards are available] |
.3+| .3+| |<<metricbeat-metricset-rabbitmq-connection,connection>> beta[]
.4+| .4+| |<<metricbeat-metricset-rabbitmq-connection,connection>> beta[]
|<<metricbeat-metricset-rabbitmq-exchange,exchange>> beta[]
|<<metricbeat-metricset-rabbitmq-node,node>> beta[]
|<<metricbeat-metricset-rabbitmq-queue,queue>> beta[]
|<<metricbeat-module-redis,Redis>> |image:./images/icon-yes.png[Prebuilt dashboards are available] |
Expand Down
1 change: 1 addition & 0 deletions metricbeat/include/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ import (
_ "github.com/elastic/beats/metricbeat/module/prometheus/stats"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq/connection"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq/exchange"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq/node"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq/queue"
_ "github.com/elastic/beats/metricbeat/module/redis"
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/rabbitmq/_meta/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rabbitmq:3-management
FROM rabbitmq:3.7.4-management

RUN apt-get update && apt-get install -y netcat && apt-get clean
HEALTHCHECK --interval=1s --retries=90 CMD nc -w 1 -v 127.0.0.1 15672 </dev/null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[
{
"message_stats": {
"publish_in": 100,
"publish_in_details": {
"rate": 0.5
},
"publish_out": 99,
"publish_out_details": {
"rate": 0.9
}
},
"user_who_performed_action": "guest",
"name": "exchange.name",
"vhost": "/",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}
]
1 change: 1 addition & 0 deletions metricbeat/module/rabbitmq/connection/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func eventsMapping(content []byte) ([]common.MapStr, error) {
err := json.Unmarshal(content, &connections)
if err != nil {
logp.Err("Error: ", err)
return nil, err
}

events := []common.MapStr{}
Expand Down
45 changes: 45 additions & 0 deletions metricbeat/module/rabbitmq/exchange/_meta/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"beat": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"metricset": {
"host": "localhost:15672",
"module": "rabbitmq",
"name": "exchange",
"rtt": 115
},
"rabbitmq": {
"exchange": {
"arguments": {},
"auto_delete": false,
"durable": true,
"internal": false,
"messages": {
"ack": {},
"confirm": {},
"deliver_get": {},
"publish": {},
"publish_in": {
"count": 607,
"details": {
"rate": 4
}
},
"publish_out": {
"count": 547,
"details": {
"rate": 4
}
},
"redeliver": {},
"return_unroutable": {}
},
"name": "",
"type": "direct",
"user": "rmq-internal",
"vhost": "/"
}
}
}
3 changes: 3 additions & 0 deletions metricbeat/module/rabbitmq/exchange/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
=== rabbitmq exchange MetricSet

This is the exchange metricset of the module rabbitmq.
47 changes: 47 additions & 0 deletions metricbeat/module/rabbitmq/exchange/_meta/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
- name: exchange
type: group
description: >
exchange
release: beta
fields:
- name: name
type: keyword
description: >
The name of the queue with non-ASCII characters escaped as in C.
- name: vhost
type: keyword
description: >
Virtual host name with non-ASCII characters escaped as in C.
- name: durable
type: boolean
description: >
Whether or not the queue survives server restarts.
- name: auto_delete
type: boolean
description: >
Whether the queue will be deleted automatically when no longer used.
- name: internal
type: boolean
description: >
Whether the exchange is internal, i.e. cannot be directly published to by a client.
- name: user
type: keyword
description: >
User who created the exchange.
- name: messages.publish_in.count
type: long
description: >
Count of messages published "in" to an exchange, i.e. not taking account of routing.
- name: messages.publish_in.details.rate
type: float
description: >
How much the exchange publish-in count has changed per second in the most recent sampling interval.
- name: messages.publish_out.count
type: long
description: >
Count of messages published "out" of an exchange, i.e. taking account of routing.
- name: messages.publish_out.details.rate
type: float
description: >
How much the exchange publish-out count has changed per second in the most recent sampling interval.
61 changes: 61 additions & 0 deletions metricbeat/module/rabbitmq/exchange/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package exchange

import (
"encoding/json"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/libbeat/logp"
)

var (
schema = s.Schema{
"name": c.Str("name"),
"vhost": c.Str("vhost"),
"type": c.Str("type"),
"durable": c.Bool("durable"),
"auto_delete": c.Bool("auto_delete"),
"internal": c.Bool("internal"),
"arguments": c.Dict("arguments", s.Schema{}),
"user": c.Str("user_who_performed_action", s.Optional),
"messages": c.Dict("message_stats", s.Schema{
"publish_in": s.Object{
"count": c.Int("publish_in", s.Optional),
"details": c.Dict("publish_in_details", s.Schema{
"rate": c.Float("rate"),
}, c.DictOptional),
},
"publish_out": s.Object{
"count": c.Int("publish_out", s.Optional),
"details": c.Dict("publish_out_details", s.Schema{
"rate": c.Float("rate"),
}, c.DictOptional),
},
}, c.DictOptional),
}
)

func eventsMapping(content []byte) ([]common.MapStr, error) {
var exchanges []map[string]interface{}
err := json.Unmarshal(content, &exchanges)
if err != nil {
logp.Err("Error: ", err)
return nil, err
}

events := []common.MapStr{}
errors := s.NewErrors()

for _, exchange := range exchanges {
event, errs := eventMapping(exchange)
events = append(events, event)
errors.AddErrors(errs)
}

return events, errors
}

func eventMapping(exchange map[string]interface{}) (common.MapStr, *s.Errors) {
return schema.Apply(exchange)
}
Loading

0 comments on commit c522fd1

Please sign in to comment.