Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Params: PGSCV_COLLECT_TOP_TABLE, PGSCV_COLLECT_TOP_INDEX, PGSCV_COLLECT_TOP_QUERY #31

Merged
merged 6 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/collector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ type Config struct {
// DatabasesRE defines regexp with databases from which builtin metrics should be collected.
DatabasesRE *regexp.Regexp
// Settings defines collectors settings propagated from main YAML configuration.
Settings model.CollectorsSettings
Settings model.CollectorsSettings
CollectTopTable int
CollectTopIndex int
CollectTopQuery int
}

// postgresServiceConfig defines Postgres-specific stuff required during collecting Postgres metrics.
Expand Down
31 changes: 27 additions & 4 deletions internal/collector/postgres_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,30 @@ import (
)

const (
userIndexesQuery = "SELECT current_database() AS database, schemaname AS schema, relname AS table, indexrelname AS index, (i.indisprimary OR i.indisunique) AS key," +
"i.indisvalid AS isvalid, idx_scan, idx_tup_read, idx_tup_fetch, idx_blks_read, idx_blks_hit,pg_relation_size(s1.indexrelid) AS size_bytes " +
userIndexesQuery = "SELECT current_database() AS database, schemaname as schema, relname as table, indexrelname as index, " +
"(i.indisprimary OR i.indisunique) AS key, i.indisvalid AS isvalid, idx_scan, idx_tup_read, idx_tup_fetch, " +
"idx_blks_read, idx_blks_hit, pg_relation_size(s1.indexrelid) as size_bytes " +
"FROM pg_stat_user_indexes s1 " +
"JOIN pg_statio_user_indexes s2 USING (schemaname, relname, indexrelname) " +
"JOIN pg_index i ON (s1.indexrelid = i.indexrelid) " +
"WHERE NOT EXISTS (SELECT 1 FROM pg_locks WHERE relation = s1.indexrelid AND mode = 'AccessExclusiveLock' AND granted)"

userIndexesQueryTopK = "WITH stat AS (SELECT schemaname AS schema, relname AS table, indexrelname AS index, (i.indisprimary OR i.indisunique) AS key, " +
"i.indisvalid AS isvalid, idx_scan, idx_tup_read, idx_tup_fetch, idx_blks_read, idx_blks_hit, pg_relation_size(s1.indexrelid) AS size_bytes, " +
"NOT i.indisvalid OR /* unused and size > 50mb */ (idx_scan = 0 AND pg_relation_size(s1.indexrelid) > 50*1024*1024) OR " +
"(row_number() OVER (ORDER BY idx_scan DESC NULLS LAST) < $1) OR (row_number() OVER (ORDER BY idx_tup_read DESC NULLS LAST) < $1) OR " +
"(row_number() OVER (ORDER BY idx_tup_fetch DESC NULLS LAST) < $1) OR (row_number() OVER (ORDER BY idx_blks_read DESC NULLS LAST) < $1) OR " +
"(row_number() OVER (ORDER BY idx_blks_hit DESC NULLS LAST) < $1) OR (row_number() OVER (ORDER BY pg_relation_size(s1.indexrelid) DESC NULLS LAST) < $1) AS visible " +
"FROM pg_stat_user_indexes s1 " +
"JOIN pg_statio_user_indexes s2 USING (schemaname, relname, indexrelname) " +
"JOIN pg_index i ON (s1.indexrelid = i.indexrelid) " +
"WHERE NOT EXISTS ( SELECT 1 FROM pg_locks WHERE relation = s1.indexrelid AND mode = 'AccessExclusiveLock' AND granted)) " +
"SELECT current_database() AS database, \"schema\", \"table\", \"index\", \"key\", isvalid, idx_scan, idx_tup_read, idx_tup_fetch, " +
"idx_blks_read, idx_blks_hit, size_bytes FROM stat WHERE visible " +
"UNION ALL SELECT current_database() AS database, 'all_shemas', 'all_other_tables', 'all_other_indexes', true, null, " +
"NULLIF(SUM(coalesce(idx_scan,0)),0), NULLIF(SUM(coalesce(idx_tup_fetch,0)),0), NULLIF(SUM(coalesce(idx_tup_read,0)),0), " +
"NULLIF(SUM(coalesce(idx_blks_read,0)),0), NULLIF(SUM(coalesce(idx_blks_hit,0)),0), " +
"NULLIF(SUM(coalesce(size_bytes,0)),0) FROM stat WHERE NOT visible HAVING EXISTS (SELECT 1 FROM stat WHERE NOT visible)"
)

// postgresIndexesCollector defines metric descriptors and stats store.
Expand Down Expand Up @@ -63,6 +81,7 @@ func NewPostgresIndexesCollector(constLabels labels, settings model.CollectorSet

// Update method collects statistics, parse it and produces metrics that are sent to Prometheus.
func (c *postgresIndexesCollector) Update(config Config, ch chan<- prometheus.Metric) error {
var err error
conn, err := store.New(config.ConnString)
if err != nil {
return err
Expand Down Expand Up @@ -91,8 +110,12 @@ func (c *postgresIndexesCollector) Update(config Config, ch chan<- prometheus.Me
if err != nil {
return err
}

res, err := conn.Query(userIndexesQuery)
var res *model.PGResult
if config.CollectTopIndex > 0 {
res, err = conn.Query(userIndexesQueryTopK, config.CollectTopIndex)
} else {
res, err = conn.Query(userIndexesQuery)
}
conn.Close()
if err != nil {
log.Warnf("get indexes stat of database %s failed: %s", d, err)
Expand Down
46 changes: 42 additions & 4 deletions internal/collector/postgres_statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

const (
// $1=$1 - stub, todo refactoring
// postgresStatementsQuery12 defines query for querying statements metrics for PG12 and older.
postgresStatementsQuery12 = "SELECT d.datname AS database, pg_get_userbyid(p.userid) AS user, p.queryid, " +
"coalesce(%s, '') AS query, p.calls, p.rows, p.total_time, p.blk_read_time, p.blk_write_time, " +
Expand All @@ -21,7 +22,7 @@ const (
"nullif(p.local_blks_hit, 0) AS local_blks_hit, nullif(p.local_blks_read, 0) AS local_blks_read, " +
"nullif(p.local_blks_dirtied, 0) AS local_blks_dirtied, nullif(p.local_blks_written, 0) AS local_blks_written, " +
"nullif(p.temp_blks_read, 0) AS temp_blks_read, nullif(p.temp_blks_written, 0) AS temp_blks_written " +
"FROM %s.pg_stat_statements p JOIN pg_database d ON d.oid=p.dbid"
"FROM %s.pg_stat_statements p JOIN pg_database d ON d.oid=p.dbid and $1=$1"

// postgresStatementsQueryLatest defines query for querying statements metrics.
// 1. use nullif(value, 0) to nullify zero values, NULL are skipped by stats method and metrics wil not be generated.
Expand All @@ -34,6 +35,35 @@ const (
"nullif(p.temp_blks_read, 0) AS temp_blks_read, nullif(p.temp_blks_written, 0) AS temp_blks_written, " +
"nullif(p.wal_records, 0) AS wal_records, nullif(p.wal_fpi, 0) AS wal_fpi, nullif(p.wal_bytes, 0) AS wal_bytes " +
"FROM %s.pg_stat_statements p JOIN pg_database d ON d.oid=p.dbid"

postgresStatementsQueryLatestTopK = "WITH stat AS (SELECT d.datname AS DATABASE, pg_get_userbyid(p.userid) AS USER, p.queryid, " +
"COALESCE(%s, '') AS query, p.calls, p.rows, p.total_exec_time, p.total_plan_time, p.blk_read_time, p.blk_write_time, " +
"NULLIF(p.shared_blks_hit, 0) AS shared_blks_hit, NULLIF(p.shared_blks_read, 0) AS shared_blks_read, " +
"NULLIF(p.shared_blks_dirtied, 0) AS shared_blks_dirtied, NULLIF(p.shared_blks_written, 0) AS shared_blks_written, " +
"NULLIF(p.local_blks_hit, 0) AS local_blks_hit, NULLIF(p.local_blks_read, 0) AS local_blks_read, " +
"NULLIF(p.local_blks_dirtied, 0) AS local_blks_dirtied, NULLIF(p.local_blks_written, 0) AS local_blks_written, " +
"NULLIF(p.temp_blks_read, 0) AS temp_blks_read, NULLIF(p.temp_blks_written, 0) AS temp_blks_written, " +
"NULLIF(p.wal_records, 0) AS wal_records, NULLIF(p.wal_fpi, 0) AS wal_fpi, NULLIF(p.wal_bytes, 0) AS wal_bytes, " +
"(ROW_NUMBER() OVER ( ORDER BY p.calls DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.rows DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.total_exec_time DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.total_plan_time DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.blk_read_time DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.blk_write_time DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.shared_blks_hit DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.shared_blks_read DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.shared_blks_dirtied DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.shared_blks_written DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.local_blks_hit DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.local_blks_read DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.local_blks_dirtied DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.local_blks_written DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.temp_blks_read DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.temp_blks_written DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.wal_records DESC NULLS LAST) < $1) OR (ROW_NUMBER() OVER ( ORDER BY p.wal_fpi DESC NULLS LAST) < $1) OR " +
"(ROW_NUMBER() OVER ( ORDER BY p.wal_bytes DESC NULLS LAST) < $1) AS visible FROM %s.pg_stat_statements p JOIN pg_database d ON d.oid = p.dbid) " +
"SELECT DATABASE, USER, queryid, query, calls, rows, total_exec_time, total_plan_time, blk_read_time, blk_write_time, shared_blks_hit, " +
"shared_blks_read, shared_blks_dirtied, shared_blks_written, local_blks_hit, local_blks_read, local_blks_dirtied, local_blks_written, " +
"temp_blks_read, temp_blks_written, wal_records, wal_fpi, wal_bytes FROM stat WHERE visible UNION ALL SELECT DATABASE, 'all_users', NULL, " +
"'all_queries', NULLIF(sum(COALESCE(calls, 0)), 0), NULLIF(sum(COALESCE(ROWS, 0)), 0), NULLIF(sum(COALESCE(total_exec_time, 0)), 0), " +
"NULLIF(sum(COALESCE(total_plan_time, 0)), 0), NULLIF(sum(COALESCE(blk_read_time, 0)), 0), NULLIF(sum(COALESCE(blk_write_time, 0)), 0), " +
"NULLIF(sum(COALESCE(shared_blks_hit, 0)), 0), NULLIF(sum(COALESCE(shared_blks_read, 0)), 0), NULLIF(sum(COALESCE(shared_blks_dirtied, 0)), 0), " +
"NULLIF(sum(COALESCE(shared_blks_written, 0)), 0), NULLIF(sum(COALESCE(local_blks_hit, 0)), 0), NULLIF(sum(COALESCE(local_blks_read, 0)), 0), " +
"NULLIF(sum(COALESCE(local_blks_dirtied, 0)), 0), NULLIF(sum(COALESCE(local_blks_written, 0)), 0), NULLIF(sum(COALESCE(temp_blks_read, 0)), 0), " +
"NULLIF(sum(COALESCE(temp_blks_written, 0)), 0), NULLIF(sum(COALESCE(wal_records, 0)), 0), NULLIF(sum(COALESCE(wal_fpi, 0)), 0), " +
"NULLIF(sum(COALESCE(wal_bytes, 0)), 0) FROM stat WHERE NOT visible GROUP BY DATABASE HAVING EXISTS (SELECT 1 FROM stat WHERE NOT visible)"
)

// postgresStatementsCollector ...
Expand Down Expand Up @@ -175,6 +205,7 @@ func NewPostgresStatementsCollector(constLabels labels, settings model.Collector

// Update method collects statistics, parse it and produces metrics that are sent to Prometheus.
func (c *postgresStatementsCollector) Update(config Config, ch chan<- prometheus.Metric) error {
var err error
// nothing to do, pg_stat_statements not found in shared_preload_libraries
if !config.pgStatStatements {
return nil
Expand All @@ -198,9 +229,13 @@ func (c *postgresStatementsCollector) Update(config Config, ch chan<- prometheus
}

defer conn.Close()

var res *model.PGResult
// get pg_stat_statements stats
res, err := conn.Query(selectStatementsQuery(config.serverVersionNum, config.pgStatStatementsSchema, config.NoTrackMode))
if config.CollectTopQuery > 0 {
res, err = conn.Query(selectStatementsQuery(config.serverVersionNum, config.pgStatStatementsSchema, config.NoTrackMode, config.CollectTopQuery), config.CollectTopQuery)
} else {
res, err = conn.Query(selectStatementsQuery(config.serverVersionNum, config.pgStatStatementsSchema, config.NoTrackMode, config.CollectTopQuery))
}
if err != nil {
return err
}
Expand Down Expand Up @@ -419,7 +454,7 @@ func parsePostgresStatementsStats(r *model.PGResult, labelNames []string) map[st
}

// selectStatementsQuery returns suitable statements query depending on passed version.
func selectStatementsQuery(version int, schema string, notrackmode bool) string {
func selectStatementsQuery(version int, schema string, notrackmode bool, topK int) string {
var query_columm string
if notrackmode {
query_columm = "null"
Expand All @@ -430,6 +465,9 @@ func selectStatementsQuery(version int, schema string, notrackmode bool) string
case version < PostgresV13:
return fmt.Sprintf(postgresStatementsQuery12, query_columm, schema)
default:
if topK > 0 {
return fmt.Sprintf(postgresStatementsQueryLatestTopK, query_columm, schema)
}
return fmt.Sprintf(postgresStatementsQueryLatest, query_columm, schema)
}
}
8 changes: 5 additions & 3 deletions internal/collector/postgres_statements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,14 @@ func Test_selectStatementsQuery(t *testing.T) {
testcases := []struct {
version int
want string
topK int
}{
{version: PostgresV12, want: fmt.Sprintf(postgresStatementsQuery12, "p.query", "example")},
{version: PostgresV13, want: fmt.Sprintf(postgresStatementsQueryLatest, "p.query", "example")},
{version: PostgresV12, want: fmt.Sprintf(postgresStatementsQuery12, "p.query", "example"), topK: 0},
{version: PostgresV13, want: fmt.Sprintf(postgresStatementsQueryLatest, "p.query", "example"), topK: 0},
{version: PostgresV13, want: fmt.Sprintf(postgresStatementsQueryLatestTopK, "p.query", "example"), topK: 100},
}

for _, tc := range testcases {
assert.Equal(t, tc.want, selectStatementsQuery(tc.version, "example", false))
assert.Equal(t, tc.want, selectStatementsQuery(tc.version, "example", false, tc.topK))
}
}
Loading