Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] migrate postgres to ReporterV2 with new error handling #11636

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions metricbeat/module/postgresql/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ import (

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/postgresql"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

var logger = logp.NewLogger("postgresql.activity")

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
Expand All @@ -56,21 +53,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_activity")
if err != nil {
err = errors.Wrap(err, "QueryStats")
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in QueryStats")
}

for _, result := range results {
Expand All @@ -79,4 +71,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
MetricSetFields: data,
})
}

return nil
}
14 changes: 4 additions & 10 deletions metricbeat/module/postgresql/activity/activity_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand All @@ -59,14 +59,8 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
22 changes: 6 additions & 16 deletions metricbeat/module/postgresql/bgwriter/bgwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"database/sql"
"fmt"

"github.com/elastic/beats/libbeat/logp"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
Expand All @@ -32,8 +30,6 @@ import (
_ "github.com/lib/pq"
)

var logger = logp.NewLogger("postgresql.bgwriter")

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
Expand All @@ -56,31 +52,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_bgwriter")
if err != nil {
err = errors.Wrap(err, "QueryStats")
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in QueryStats")
}
if len(results) == 0 {
err = fmt.Errorf("No results from the pg_stat_bgwriter query")
logger.Error(err)
reporter.Error(err)
return
return fmt.Errorf("No results from the pg_stat_bgwriter query")
}

data, _ := schema.Apply(results[0])
reporter.Event(mb.Event{
MetricSetFields: data,
})

return nil
}
14 changes: 4 additions & 10 deletions metricbeat/module/postgresql/bgwriter/bgwriter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down Expand Up @@ -64,14 +64,8 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
17 changes: 5 additions & 12 deletions metricbeat/module/postgresql/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package database
import (
"database/sql"

"github.com/elastic/beats/libbeat/logp"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
Expand All @@ -31,8 +29,6 @@ import (
_ "github.com/lib/pq"
)

var logger = logp.NewLogger("postgresql.database")

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
Expand All @@ -55,21 +51,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_database")
if err != nil {
err = errors.Wrap(err, "QueryStats")
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in QueryStats")
}

for _, result := range results {
Expand All @@ -78,4 +69,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
MetricSetFields: data,
})
}

return nil
}
14 changes: 4 additions & 10 deletions metricbeat/module/postgresql/database/database_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand All @@ -61,14 +61,8 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/module/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func init() {
}
}

//NewModule returns a new instance of the module
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we normally add a space after the //. Applies to few other places.

Thanks for adding the missing doc blocks.

func NewModule(base mb.BaseModule) (mb.Module, error) {
// Validate that at least one host has been specified.
config := struct {
Expand All @@ -54,6 +55,7 @@ func NewModule(base mb.BaseModule) (mb.Module, error) {
return &base, nil
}

//ParseURL is the postgres host parser
func ParseURL(mod mb.Module, rawURL string) (mb.HostData, error) {
c := struct {
Username string `config:"username"`
Expand Down Expand Up @@ -102,6 +104,7 @@ func ParseURL(mod mb.Module, rawURL string) (mb.HostData, error) {
return h, nil
}

//QueryStats makes the database call for a given metric
func QueryStats(db *sql.DB, query string) ([]map[string]interface{}, error) {
rows, err := db.Query(query)
if err != nil {
Expand Down
17 changes: 5 additions & 12 deletions metricbeat/module/postgresql/statement/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package statement
import (
"database/sql"

"github.com/elastic/beats/libbeat/logp"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
Expand All @@ -31,8 +29,6 @@ import (
_ "github.com/lib/pq"
)

var logger = logp.NewLogger("postgresql.statement")

// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host defined in the module's configuration. After the
Expand Down Expand Up @@ -67,21 +63,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_statements")
if err != nil {
err = errors.Wrap(err, "QueryStats")
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "QueryStats")
}

for _, result := range results {
Expand All @@ -90,4 +81,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
MetricSetFields: data,
})
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down Expand Up @@ -92,14 +92,8 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/module/postgresql/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ package postgresql

import "os"

//GetEnvDSN returns the Data Source Name
func GetEnvDSN() string {
return os.Getenv("POSTGRESQL_DSN")
}

//GetEnvUsername returns the username
func GetEnvUsername() string {
return os.Getenv("POSTGRESQL_USERNAME")
}

//GetEnvPassword returns the password
func GetEnvPassword() string {
return os.Getenv("POSTGRESQL_PASSWORD")
}