diff --git a/README.md b/README.md index 75ac9de1eab86..54b06ac4619da 100644 --- a/README.md +++ b/README.md @@ -205,6 +205,7 @@ configuration options. * [openldap](./plugins/inputs/openldap) * [opensmtpd](./plugins/inputs/opensmtpd) * [pf](./plugins/inputs/pf) +* [pgbouncer](./plugins/inputs/pgbouncer) * [phpfpm](./plugins/inputs/phpfpm) * [phusion passenger](./plugins/inputs/passenger) * [ping](./plugins/inputs/ping) diff --git a/docker-compose.yml b/docker-compose.yml index 822d7fff1a76b..5ac47089db975 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,6 +40,13 @@ services: image: memcached ports: - "11211:11211" + pgbouncer: + image: mbed/pgbouncer + environment: + PG_ENV_POSTGRESQL_USER: pgbouncer + PG_ENV_POSTGRESQL_PASS: pgbouncer + ports: + - "6432:6432" postgres: image: postgres:alpine ports: diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 8594db0a91361..f50ceb6a103a7 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -83,6 +83,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/opensmtpd" _ "github.com/influxdata/telegraf/plugins/inputs/passenger" _ "github.com/influxdata/telegraf/plugins/inputs/pf" + _ "github.com/influxdata/telegraf/plugins/inputs/pgbouncer" _ "github.com/influxdata/telegraf/plugins/inputs/phpfpm" _ "github.com/influxdata/telegraf/plugins/inputs/ping" _ "github.com/influxdata/telegraf/plugins/inputs/postfix" diff --git a/plugins/inputs/pgbouncer/README.md b/plugins/inputs/pgbouncer/README.md new file mode 100644 index 0000000000000..2a841c45aada0 --- /dev/null +++ b/plugins/inputs/pgbouncer/README.md @@ -0,0 +1,21 @@ +# PgBouncer plugin + +This PgBouncer plugin provides metrics for your PgBouncer load balancer. + +More information about the meaning of these metrics can be found in the [PgBouncer Documentation](https://pgbouncer.github.io/usage.html) + +## Configuration +Specify address via a url matching: + + `postgres://[pqgotest[:password]]@localhost[/dbname]?sslmode=[disable|verify-ca|verify-full]` + +All connection parameters are optional. + +Without the dbname parameter, the driver will default to a database with the same name as the user. +This dbname is just for instantiating a connection with the server and doesn't restrict the databases we are trying to grab metrics for. + +### Configuration example +``` +[[inputs.pgbouncer]] + address = "postgres://telegraf@localhost/pgbouncer" +``` diff --git a/plugins/inputs/pgbouncer/pgbouncer.go b/plugins/inputs/pgbouncer/pgbouncer.go new file mode 100644 index 0000000000000..722648c48edc1 --- /dev/null +++ b/plugins/inputs/pgbouncer/pgbouncer.go @@ -0,0 +1,179 @@ +package pgbouncer + +import ( + "bytes" + "github.com/influxdata/telegraf/plugins/inputs/postgresql" + + // register in driver. + _ "github.com/jackc/pgx/stdlib" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type PgBouncer struct { + postgresql.Service +} + +var ignoredColumns = map[string]bool{"user": true, "database": true, "pool_mode": true, + "avg_req": true, "avg_recv": true, "avg_sent": true, "avg_query": true, +} + +var sampleConfig = ` + ## specify address via a url matching: + ## postgres://[pqgotest[:password]]@localhost[/dbname]\ + ## ?sslmode=[disable|verify-ca|verify-full] + ## or a simple string: + ## host=localhost user=pqotest password=... sslmode=... dbname=app_production + ## + ## All connection parameters are optional. + ## + address = "host=localhost user=pgbouncer sslmode=disable" +` + +func (p *PgBouncer) SampleConfig() string { + return sampleConfig +} + +func (p *PgBouncer) Description() string { + return "Read metrics from one or many pgbouncer servers" +} + +func (p *PgBouncer) Gather(acc telegraf.Accumulator) error { + var ( + err error + query string + columns []string + ) + + query = `SHOW STATS` + + rows, err := p.DB.Query(query) + if err != nil { + return err + } + + defer rows.Close() + + // grab the column information from the result + if columns, err = rows.Columns(); err != nil { + return err + } + + for rows.Next() { + tags, columnMap, err := p.accRow(rows, acc, columns) + + if err != nil { + return err + } + + fields := make(map[string]interface{}) + for col, val := range columnMap { + _, ignore := ignoredColumns[col] + if !ignore { + fields[col] = *val + } + } + acc.AddFields("pgbouncer", fields, tags) + } + + query = `SHOW POOLS` + + poolRows, err := p.DB.Query(query) + if err != nil { + return err + } + + defer poolRows.Close() + + // grab the column information from the result + if columns, err = poolRows.Columns(); err != nil { + return err + } + + for poolRows.Next() { + tags, columnMap, err := p.accRow(poolRows, acc, columns) + if err != nil { + return err + } + + if s, ok := (*columnMap["user"]).(string); ok && s != "" { + tags["user"] = s + } + + if s, ok := (*columnMap["pool_mode"]).(string); ok && s != "" { + tags["pool_mode"] = s + } + + fields := make(map[string]interface{}) + for col, val := range columnMap { + _, ignore := ignoredColumns[col] + if !ignore { + fields[col] = *val + } + } + acc.AddFields("pgbouncer_pools", fields, tags) + } + + return poolRows.Err() +} + +type scanner interface { + Scan(dest ...interface{}) error +} + +func (p *PgBouncer) accRow(row scanner, acc telegraf.Accumulator, columns []string) (map[string]string, + map[string]*interface{}, error) { + var columnVars []interface{} + var dbname bytes.Buffer + + // this is where we'll store the column name with its *interface{} + columnMap := make(map[string]*interface{}) + + for _, column := range columns { + columnMap[column] = new(interface{}) + } + + // populate the array of interface{} with the pointers in the right order + for i := 0; i < len(columnMap); i++ { + columnVars = append(columnVars, columnMap[columns[i]]) + } + + // deconstruct array of variables and send to Scan + err := row.Scan(columnVars...) + + if err != nil { + return nil, nil, err + } + if columnMap["database"] != nil { + // extract the database name from the column map + dbname.WriteString((*columnMap["database"]).(string)) + } else { + dbname.WriteString("postgres") + } + + var tagAddress string + tagAddress, err = p.SanitizedAddress() + if err != nil { + return nil, nil, err + } + + // Return basic tags and the mapped columns + return map[string]string{"server": tagAddress, "db": dbname.String()}, columnMap, nil +} + +func init() { + inputs.Add("pgbouncer", func() telegraf.Input { + return &PgBouncer{ + Service: postgresql.Service{ + MaxIdle: 1, + MaxOpen: 1, + MaxLifetime: internal.Duration{ + Duration: 0, + }, + IsPgBouncer: true, + }, + } + }) +} diff --git a/plugins/inputs/pgbouncer/pgbouncer_test.go b/plugins/inputs/pgbouncer/pgbouncer_test.go new file mode 100644 index 0000000000000..44e28c7f3335e --- /dev/null +++ b/plugins/inputs/pgbouncer/pgbouncer_test.go @@ -0,0 +1,66 @@ +package pgbouncer + +import ( + "fmt" + "github.com/influxdata/telegraf/plugins/inputs/postgresql" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" +) + +func TestPgBouncerGeneratesMetrics(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + p := &PgBouncer{ + Service: postgresql.Service{ + Address: fmt.Sprintf( + "host=%s user=pgbouncer password=pgbouncer dbname=pgbouncer port=6432 sslmode=disable", + testutil.GetLocalHost(), + ), + IsPgBouncer: true, + }, + } + + var acc testutil.Accumulator + require.NoError(t, p.Start(&acc)) + require.NoError(t, p.Gather(&acc)) + + intMetrics := []string{ + "total_requests", + "total_received", + "total_sent", + "total_query_time", + "avg_req", + "avg_recv", + "avg_sent", + "avg_query", + "cl_active", + "cl_waiting", + "sv_active", + "sv_idle", + "sv_used", + "sv_tested", + "sv_login", + "maxwait", + } + + int32Metrics := []string{} + + metricsCounted := 0 + + for _, metric := range intMetrics { + assert.True(t, acc.HasInt64Field("pgbouncer", metric)) + metricsCounted++ + } + + for _, metric := range int32Metrics { + assert.True(t, acc.HasInt32Field("pgbouncer", metric)) + metricsCounted++ + } + + assert.True(t, metricsCounted > 0) + assert.Equal(t, len(intMetrics)+len(int32Metrics), metricsCounted) +} diff --git a/plugins/inputs/postgresql/postgresql.go b/plugins/inputs/postgresql/postgresql.go index 19c9db9ce0f3c..e136098f4f304 100644 --- a/plugins/inputs/postgresql/postgresql.go +++ b/plugins/inputs/postgresql/postgresql.go @@ -189,6 +189,7 @@ func init() { MaxLifetime: internal.Duration{ Duration: 0, }, + IsPgBouncer: false, }, } }) diff --git a/plugins/inputs/postgresql/postgresql_test.go b/plugins/inputs/postgresql/postgresql_test.go index 306dca3b6b6ef..b23321019f5f8 100644 --- a/plugins/inputs/postgresql/postgresql_test.go +++ b/plugins/inputs/postgresql/postgresql_test.go @@ -20,6 +20,7 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { "host=%s user=postgres sslmode=disable", testutil.GetLocalHost(), ), + IsPgBouncer: false, }, Databases: []string{"postgres"}, } diff --git a/plugins/inputs/postgresql/service.go b/plugins/inputs/postgresql/service.go index 4f7b21e549cf5..9d3ab396317a1 100644 --- a/plugins/inputs/postgresql/service.go +++ b/plugins/inputs/postgresql/service.go @@ -3,6 +3,9 @@ package postgresql import ( "database/sql" "fmt" + "github.com/jackc/pgx" + "github.com/jackc/pgx/pgtype" + "github.com/jackc/pgx/stdlib" "net" "net/url" "regexp" @@ -90,6 +93,7 @@ type Service struct { MaxOpen int MaxLifetime internal.Duration DB *sql.DB + IsPgBouncer bool } // Start starts the ServiceInput's service, whatever that may be @@ -100,7 +104,34 @@ func (p *Service) Start(telegraf.Accumulator) (err error) { p.Address = localhost } - if p.DB, err = sql.Open("pgx", p.Address); err != nil { + connectionString := p.Address + + // Specific support to make it work with PgBouncer too + // See https://github.com/influxdata/telegraf/issues/3253#issuecomment-357505343 + if p.IsPgBouncer { + d := &stdlib.DriverConfig{ + ConnConfig: pgx.ConnConfig{ + PreferSimpleProtocol: true, + RuntimeParams: map[string]string{ + "client_encoding": "UTF8", + }, + CustomConnInfo: func(c *pgx.Conn) (*pgtype.ConnInfo, error) { + info := c.ConnInfo.DeepCopy() + info.RegisterDataType(pgtype.DataType{ + Value: &pgtype.OIDValue{}, + Name: "int8OID", + OID: pgtype.Int8OID, + }) + + return info, nil + }, + }, + } + stdlib.RegisterDriverConfig(d) + connectionString = d.ConnectionString(p.Address) + } + + if p.DB, err = sql.Open("pgx", connectionString); err != nil { return err } diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go index 056f4afc841df..a04382888b41b 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -283,6 +283,7 @@ func init() { MaxLifetime: internal.Duration{ Duration: 0, }, + IsPgBouncer: false, }, } }) diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go index 77db5feb542fc..0f9358da63ac1 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go @@ -17,6 +17,7 @@ func queryRunner(t *testing.T, q query) *testutil.Accumulator { "host=%s user=postgres sslmode=disable", testutil.GetLocalHost(), ), + IsPgBouncer: false, }, Databases: []string{"postgres"}, Query: q,