Skip to content

Commit

Permalink
create statements metricset for the metricbeat postgresql module
Browse files Browse the repository at this point in the history
  • Loading branch information
zinefer committed May 15, 2018
1 parent f85dbf8 commit b812cba
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 0 deletions.
54 changes: 54 additions & 0 deletions metricbeat/module/postgresql/statements/_meta/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"beat": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"metricset": {
"host": "postgresql:5432",
"module": "postgresql",
"name": "statements",
"rtt": 115
},
"postgresql": {
"statements": {
"user": {
"id": 10
},
"database": {
"oid": "12407"
},
"query": {
"id": "3240664890",
"text": "SELECT pg_sleep(?);",
"calls": "2",
"rows": "2",
"time": {
"total": 120066.497,
"min": 60029.533,
"max": 60036.964,
"mean": 60033.2485,
"stddev": 3.71549999999843
},
"memory": {
"shared": {
"hit": 0,
"read": 0,
"dirtied": 0,
"written": 0,
},
"local": {
"hit": 0,
"read": 0,
"dirtied": 0,
"written": 0,
},
"temp": {
"read": 0,
"written": 0,
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This is the `statements` metricset of the PostgreSQL module.
91 changes: 91 additions & 0 deletions metricbeat/module/postgresql/statements/_meta/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
- name: statements
type: group
description: >
One document per query per user per database, showing information related
invocation of that query, such as cpu usage and total time. Collected by
querying pg_stat_statements.
release: ga
fields:
- name: user.id
type: long
description: >
OID of the user logged into the backend that ran the query.
- name: database.oid
type: long
description: >
OID of the database the query was run on.
- name: query.id
type: long
description: >
ID of the statement.
- name: query.text
description: >
Query text
- name: query.calls
type: long
description: >
Number of times the query has been run.
- name: query.rows
type: long
description: >
Total number of rows returned by query.
- name: time.total
type: long
description: >
Total number of milliseconds spent running query.
- name: time.min
type: long
description: >
Minimum number of milliseconds spent running query.
- name: time.max
type: long
description: >
Maximum number of milliseconds spent running query.
- name: time.mean
type: long
description: >
Mean number of milliseconds spent running query.
- name: time.stddev
type: long
description: >
Population standard deviation of time spent running query, in milliseconds.
- name: memory.shared.hit
type: long
description: >
Total number of shared block cache hits by the query.
- name: memory.shared.read
type: long
description: >
Total number of shared block cache read by the query.
- name: memory.shared.dirtied
type: long
description: >
Total number of shared block cache dirtied by the query.
- name: memory.shared.written
type: long
description: >
Total number of shared block cache written by the query.
- name: memory.local.hit
type: long
description: >
Total number of local block cache hits by the query.
- name: memory.local.read
type: long
description: >
Total number of local block cache read by the query.
- name: memory.local.dirtied
type: long
description: >
Total number of local block cache dirtied by the query.
- name: memory.local.written
type: long
description: >
Total number of local block cache written by the query.
- name: memory.temp.read
type: long
description: >
Total number of temp block cache read by the query.
- name: memory.temp.written
type: long
description: >
Total number of temp block cache written by the query.
47 changes: 47 additions & 0 deletions metricbeat/module/postgresql/statements/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package statements

import (
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstrstr"
)

// Based on: https://www.postgresql.org/docs/9.6/static/pgstatstatements.html
var schema = s.Schema{
"user": s.Object{
"id": c.Int("userid"),
},
"database": s.Object{
"oid": c.Int("dbid"),
},
"query": s.Object{
"id": c.Str("queryid"),
"text": c.Str("query"),
"calls": c.Int("datid"),
"rows": c.Int("rows"),
"time": s.Object{
"total": s.Object{"ms": c.Float("total_time")},
"min": s.Object{"ms": c.Float("min_time")},
"max": s.Object{"ms": c.Float("max_time")},
"mean": s.Object{"ms": c.Float("mean_time")},
"stddev": s.Object{"ms": c.Float("stddev_time")},
},
"memory": s.Object{
"shared": s.Object{
"hit": c.Int("shared_blks_hit"),
"read": c.Int("shared_blks_read"),
"dirtied": c.Int("shared_blks_dirtied"),
"written": c.Int("shared_blks_written"),
},
"local": s.Object{
"hit": c.Int("local_blks_hit"),
"read": c.Int("local_blks_read"),
"dirtied": c.Int("local_blks_dirtied"),
"written": c.Int("local_blks_written"),
},
"temp": s.Object{
"read": c.Int("temp_blks_read"),
"written": c.Int("temp_blks_written"),
},
},
},
}
57 changes: 57 additions & 0 deletions metricbeat/module/postgresql/statements/statements.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package statements

import (
"database/sql"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/postgresql"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
mb.Registry.MustAddMetricSet("postgresql", "statements", New,
mb.WithHostParser(postgresql.ParseURL),
mb.DefaultMetricSet(),
)
}

// MetricSet type defines all fields of the Postgresql MetricSet
type MetricSet struct {
mb.BaseMetricSet
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{BaseMetricSet: base}, nil
}

// Fetch implements the data gathering and data conversion to the right format.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
return nil, err
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_statements")
if err != nil {
return nil, errors.Wrap(err, "QueryStats")
}

events := []common.MapStr{}
for _, result := range results {
data, _ := schema.Apply(result)
events = append(events, data)
}

return events, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// +build integration

package statements

import (
"testing"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/tests/compose"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
"github.com/elastic/beats/metricbeat/module/postgresql"

"github.com/stretchr/testify/assert"
)

func TestFetch(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewEventsFetcher(t, getConfig())
events, err := f.Fetch()
if !assert.NoError(t, err) {
t.FailNow()
}

assert.True(t, len(events) > 0)
event := events[0]

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)

// Check event fields
assert.Contains(t, event, "user")
assert.Contains(t, event["user"].(common.MapStr), "id")

assert.Contains(t, event, "database")
db_oid := event["database"].(common.MapStr)["oid"].(int64)
assert.True(t, db_oid > 0)

assert.Contains(t, event, "query")
query := event["query"].(common.MapStr)
assert.Contains(t, query, "id")
assert.Contains(t, query, "text")
assert.Contains(t, query, "calls")
assert.Contains(t, query, "rows")

assert.Contains(t, query, "time")
time := query["time"].(common.MapStr)
assert.Contains(t, time, "total")
assert.Contains(t, time, "min")
assert.Contains(t, time, "max")
assert.Contains(t, time, "mean")
assert.Contains(t, time, "stddev")

assert.Contains(t, query["memory"], "shared")
memory := query["memory"].(common.MapStr)

assert.Contains(t, memory, "shared")
shared := memory["shared"].(common.MapStr)
assert.Contains(t, shared, "hit")
assert.Contains(t, shared, "read")
assert.Contains(t, shared, "dirtied")
assert.Contains(t, shared, "written")

assert.Contains(t, memory, "local")
local := memory["local"].(common.MapStr)
assert.Contains(t, local, "hit")
assert.Contains(t, local, "read")
assert.Contains(t, local, "dirtied")
assert.Contains(t, local, "written")

assert.Contains(t, memory, "temp")
temp := memory["temp"].(common.MapStr)
assert.Contains(t, temp, "read")
assert.Contains(t, temp, "written")
}

func TestData(t *testing.T) {
compose.EnsureUp(t, "postgresql")
f := mbtest.NewEventsFetcher(t, getConfig())

err := mbtest.WriteEvents(f, t)
if err != nil {
t.Fatal("write", err)
}
}

func getConfig() map[string]interface{} {
return map[string]interface{}{
"module": "postgresql",
"metricsets": []string{"statements"},
"hosts": []string{postgresql.GetEnvDSN()},
"username": postgresql.GetEnvUsername(),
"password": postgresql.GetEnvPassword(),
}
}

0 comments on commit b812cba

Please sign in to comment.