diff --git a/metricbeat/module/postgresql/database/database.go b/metricbeat/module/postgresql/database/database.go index 818cace90675..447b8614d462 100644 --- a/metricbeat/module/postgresql/database/database.go +++ b/metricbeat/module/postgresql/database/database.go @@ -20,9 +20,10 @@ package database import ( "database/sql" + "github.com/elastic/beats/libbeat/logp" + "github.com/pkg/errors" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/postgresql" @@ -30,6 +31,8 @@ import ( _ "github.com/lib/pq" ) +var logger = logp.NewLogger("postgresql.database") + // init registers the MetricSet with the central registry. // The New method will be called after the setup of the module and before starting to fetch data func init() { @@ -49,24 +52,30 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{BaseMetricSet: base}, nil } -// Fetch methods implements the data gathering and data conversion to the right format -func (m *MetricSet) Fetch() ([]common.MapStr, error) { +// Fetch methods implements the data gathering and data conversion to the right +// format. It publishes the event which is then forwarded to the output. In case +// of an error set the Error field of mb.Event or simply call report.Error(). +func (m *MetricSet) Fetch(reporter mb.ReporterV2) { db, err := sql.Open("postgres", m.HostData().URI) if err != nil { - return nil, err + logger.Error(err) + reporter.Error(err) + return } defer db.Close() results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_database") if err != nil { - return nil, errors.Wrap(err, "QueryStats") + err = errors.Wrap(err, "QueryStats") + logger.Error(err) + reporter.Error(err) + return } - events := []common.MapStr{} for _, result := range results { data, _ := schema.Apply(result) - events = append(events, data) + reporter.Event(mb.Event{ + MetricSetFields: data, + }) } - - return events, nil } diff --git a/metricbeat/module/postgresql/database/database_integration_test.go b/metricbeat/module/postgresql/database/database_integration_test.go index 50287dce897d..907b357062d4 100644 --- a/metricbeat/module/postgresql/database/database_integration_test.go +++ b/metricbeat/module/postgresql/database/database_integration_test.go @@ -33,14 +33,13 @@ import ( func TestFetch(t *testing.T) { compose.EnsureUp(t, "postgresql") - f := mbtest.NewEventsFetcher(t, getConfig()) - events, err := f.Fetch() - if !assert.NoError(t, err) { - t.FailNow() + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) } - - assert.True(t, len(events) > 0) - event := events[0] + assert.NotEmpty(t, events) + event := events[0].MetricSetFields t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) @@ -62,10 +61,14 @@ func TestFetch(t *testing.T) { func TestData(t *testing.T) { compose.EnsureUp(t, "postgresql") - f := mbtest.NewEventsFetcher(t, getConfig()) + f := mbtest.NewReportingMetricSetV2(t, getConfig()) + events, errs := mbtest.ReportingFetchV2(f) + if len(errs) > 0 { + t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs) + } + assert.NotEmpty(t, events) - err := mbtest.WriteEvents(f, t) - if err != nil { + if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil { t.Fatal("write", err) } }