Skip to content

Commit

Permalink
[Metricbeat] Migrate PostgreSQL bgwriter Metricset to use ReporterV2 …
Browse files Browse the repository at this point in the history
…interface (#10831)
  • Loading branch information
sayden authored Mar 5, 2019
1 parent 0c7912a commit 9762b07
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 22 deletions.
22 changes: 14 additions & 8 deletions metricbeat/module/postgresql/bgwriter/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"beat": {
"agent": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"event": {
"dataset": "postgresql.bgwriter",
"duration": 115000,
"module": "postgresql"
},
"metricset": {
"host": "postgresql:5432",
"module": "postgresql",
"name": "bgwriter",
"rtt": 115
"name": "bgwriter"
},
"postgresql": {
"bgwriter": {
"buffers": {
"allocated": 141,
"allocated": 143,
"backend": 0,
"backend_fsync": 0,
"checkpoints": 0,
Expand All @@ -22,7 +24,7 @@
},
"checkpoints": {
"requested": 0,
"scheduled": 2,
"scheduled": 1,
"times": {
"sync": {
"ms": 0
Expand All @@ -32,7 +34,11 @@
}
}
},
"stats_reset": "2017-12-07T07:20:38.413Z"
"stats_reset": "2019-03-05T08:32:30.028Z"
}
},
"service": {
"address": "172.26.0.2:5432",
"type": "postgresql"
}
}
29 changes: 22 additions & 7 deletions metricbeat/module/postgresql/bgwriter/bgwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ import (
"database/sql"
"fmt"

"github.com/elastic/beats/libbeat/logp"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/postgresql"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

var logger = logp.NewLogger("postgresql.bgwriter")

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
Expand All @@ -50,22 +53,34 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{BaseMetricSet: base}, nil
}

// Fetch methods implements the data gathering and data conversion to the right format
func (m *MetricSet) Fetch() (common.MapStr, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
return nil, err
logger.Error(err)
reporter.Error(err)
return
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_bgwriter")
if err != nil {
return nil, errors.Wrap(err, "QueryStats")
err = errors.Wrap(err, "QueryStats")
logger.Error(err)
reporter.Error(err)
return
}
if len(results) == 0 {
return nil, fmt.Errorf("No results from the pg_stat_bgwriter query")
err = fmt.Errorf("No results from the pg_stat_bgwriter query")
logger.Error(err)
reporter.Error(err)
return
}

data, _ := schema.Apply(results[0])
return data, nil
reporter.Event(mb.Event{
MetricSetFields: data,
})
}
20 changes: 13 additions & 7 deletions metricbeat/module/postgresql/bgwriter/bgwriter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewEventFetcher(t, getConfig())
event, err := f.Fetch()
if !assert.NoError(t, err) {
t.FailNow()
f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)
event := events[0].MetricSetFields

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)

Expand All @@ -62,10 +64,14 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewEventFetcher(t, getConfig())
f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

err := mbtest.WriteEvent(f, t)
if err != nil {
if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down

0 comments on commit 9762b07

Please sign in to comment.