Skip to content

Commit

Permalink
Profile: implement accounting for parallel workers using leader_pid. …
Browse files Browse the repository at this point in the history
…1) getting stats using NewPGresulQuery, 2) add stats parser, 3) refactor accounting, 4) add new and improve existing tests, 5) code blocks re-arrangement, 6) rewrite comments.
  • Loading branch information
lesovsky committed Mar 7, 2021
1 parent d281ffc commit 3d54c8d
Show file tree
Hide file tree
Showing 3 changed files with 304 additions and 195 deletions.
274 changes: 178 additions & 96 deletions profile/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,32 @@ package profile

import (
"fmt"
"github.com/jackc/pgx/v4"
"github.com/lesovsky/pgcenter/internal/postgres"
"github.com/lesovsky/pgcenter/internal/stat"
"io"
"os"
"os/signal"
"sort"
"strconv"
"syscall"
"time"
)

// waitEvent defines particular wait event and how many times it is occurred.
type waitEvent struct {
waitEventName string
waitEventValue float64
}

// waitEvents defines slice of waitEvent.
type waitEvents []waitEvent

// Implement sort methods for waitEvents.
func (p waitEvents) Len() int { return len(p) }
func (p waitEvents) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p waitEvents) Less(i, j int) bool { return p[i].waitEventValue > p[j].waitEventValue }
const (
inclusiveQuery = "SELECT pid, " +
"extract(epoch from clock_timestamp() - query_start) AS query_duration, " +
"date_trunc('milliseconds', state_change) AS state_change_time, " +
"state AS state, " +
"coalesce(wait_event_type ||'.'|| wait_event, '') AS wait_entry, query " +
"FROM pg_stat_activity WHERE pid = %d OR leader_pid = %d /* pgcenter profile */"

// profileStat describes stat snapshot retrieved from Postgres' pg_stat_activity view.
type profileStat struct {
queryDurationSec float64 // number of seconds query is running at the moment of snapshotting.
changeStateTime string // value of pg_stat_activity.change_state tells about when query has been finished (or new one started)
state string // backend state
waitEntry string // wait_event_type/wait_event
queryText string // query executed by backend
}
exclusiveQuery = "SELECT pid, " +
"extract(epoch from clock_timestamp() - query_start) AS query_duration, " +
"date_trunc('milliseconds', state_change) AS state_change_time, " +
"state AS state, " +
"coalesce(wait_event_type ||'.'|| wait_event, '') AS wait_entry, query " +
"FROM pg_stat_activity WHERE pid = %d /* pgcenter profile */"
)

// Config defines program's configuration options.
type Config struct {
Expand All @@ -62,8 +56,10 @@ func RunMain(dbConfig postgres.Config, config Config) error {

// stats defines local statistics storage for profiled query.
type stats struct {
durations map[string]float64
ratios map[string]float64
real float64
accumulated float64
durations map[string]float64
ratios map[string]float64
}

// newStatsStore creates new stats store.
Expand All @@ -74,9 +70,29 @@ func newStatsStore() stats {
}
}

// profileLoop profiles and prints profiling results.
// resetStatsStore deletes all entries from the stats maps counters
func resetStatsStore(s stats) stats {
for k := range s.durations {
delete(s.durations, k)
}
for k := range s.ratios {
delete(s.ratios, k)
}
return s
}

// profileStat describes snapshot of activity statistics about single profiled process.
type profileStat struct {
queryDurationSec float64 // number of seconds query is running at the moment of snapshot.
changeStateTime string // value of pg_stat_activity.change_state tells about when query has been finished (or new one started)
state string // backend state
waitEntry string // wait_event_type/wait_event
queryText string // query executed by backend
}

// profileLoop profiles target Process ID in a loop and prints profiling results.
func profileLoop(w io.Writer, conn *postgres.DB, cfg Config, doQuit chan os.Signal) error {
var prev profileStat
prev := make(map[int]profileStat)
s := newStatsStore()

_, err := fmt.Fprintf(w, "LOG: Profiling process %d with %s sampling\n", cfg.Pid, cfg.Frequency)
Expand All @@ -86,55 +102,62 @@ func profileLoop(w io.Writer, conn *postgres.DB, cfg Config, doQuit chan os.Sign

t := time.NewTicker(cfg.Frequency)

pid := cfg.Pid

// TODO: inclusive queries impossible on Postgres version 12 and older.
query := selectQuery(pid, true)

for {
curr, profileErr := getProfileSnapshot(conn, cfg.Pid)
if profileErr != nil && profileErr == pgx.ErrNoRows {
// print collected stats before exit
err := printStat(w, s)
if err != nil {
return err
}
// Get activity snapshot
res, err := stat.NewPGresultQuery(conn, query)
if err != nil {
return err
}

_, err = fmt.Fprintf(w, "LOG: Stop profiling, process with pid %d doesn't exist (%s)\n", cfg.Pid, profileErr.Error())
// No rows returned means profiled process quits. No reason to continue, print stats and return.
if res.Nrows == 0 {
_, err = fmt.Fprintf(w, "LOG: Stop profiling, no process with pid %d\n", pid)
if err != nil {
return err
}

return nil
} else if profileErr != nil {
return profileErr
}

// Extract per-process stats from activity snapshot.
curr := parseActivitySnapshot(res)

// Compare previous and current activity snapshots and analyze target process state transition.
switch {
case prev.state != "active" && curr.state == "active":
case prev[pid].state != "active" && curr[pid].state == "active":
// !active -> active - a query has been started - begin to count stats.
err := printHeader(w, curr, cfg.Strsize)
err := printHeader(w, curr[pid], cfg.Strsize)
if err != nil {
return err
}
s = countWaitings(s, curr, profileStat{})
s = countWaitEvents(s, pid, curr, map[int]profileStat{})
prev = curr
case prev.state == "active" && curr.state == "active" && prev.changeStateTime == curr.changeStateTime:
case prev[pid].state == "active" && curr[pid].state == "active" && prev[pid].changeStateTime == curr[pid].changeStateTime:
// active -> active - query continues executing - continue to count stats.
s = countWaitings(s, curr, prev)
s = countWaitEvents(s, pid, curr, prev)
prev = curr
case prev.state == "active" && curr.state == "active" && prev.changeStateTime != curr.changeStateTime:
case prev[pid].state == "active" && curr[pid].state == "active" && prev[pid].changeStateTime != curr[pid].changeStateTime:
// active -> active (new) - a new query has been started - print stat for previous query, count new stats.
err := printStat(w, s)
if err != nil {
return err
}
s = resetCounters(s)
s = countWaitings(s, curr, profileStat{})
prev = profileStat{}
case prev.state == "active" && curr.state != "active":
s = resetStatsStore(s)
s = countWaitEvents(s, pid, curr, map[int]profileStat{})
prev = map[int]profileStat{}
case prev[pid].state == "active" && curr[pid].state != "active":
// active -> idle - query has been finished, but no new query started - print stat, waiting for new query.
err := printStat(w, s)
if err != nil {
return err
}
s = resetCounters(s)
prev = profileStat{}
s = resetStatsStore(s)
prev = map[int]profileStat{}
}

// Wait ticker ticks.
Expand All @@ -152,57 +175,87 @@ func profileLoop(w io.Writer, conn *postgres.DB, cfg Config, doQuit chan os.Sign
}
}

// getProfileSnapshot get necessary activity snapshot from Postgres.
func getProfileSnapshot(conn *postgres.DB, pid int) (profileStat, error) {
query := "SELECT " +
"extract(epoch from clock_timestamp() - query_start) AS query_duration, " +
"date_trunc('milliseconds', state_change) AS state_change_time, " +
"state AS state, " +
"coalesce(wait_event_type ||'.'|| wait_event, '') AS wait_entry, query " +
"FROM pg_stat_activity WHERE pid = $1 /* pgcenter profile */"

var s profileStat
// parseActivitySnapshot parses PGresult and returns per-process profile statistics.
func parseActivitySnapshot(res stat.PGresult) map[int]profileStat {
stat := make(map[int]profileStat)

row := conn.QueryRow(query, pid)
err := row.Scan(&s.queryDurationSec,
&s.changeStateTime,
&s.state,
&s.waitEntry,
&s.queryText,
)
for _, row := range res.Values {
var pid int
var s profileStat

return s, err
}
for i, colname := range res.Cols {
// Skip empty (NULL) values.
if !row[i].Valid {
continue
}

// countWaitings counts wait events durations and its percent rations accordingly to total query time.
func countWaitings(s stats, curr profileStat, prev profileStat) stats {
// calculate durations
if curr.waitEntry == "" {
s.durations["Running"] = s.durations["Running"] + (curr.queryDurationSec - prev.queryDurationSec)
} else {
s.durations[curr.waitEntry] = s.durations[curr.waitEntry] + (curr.queryDurationSec - prev.queryDurationSec)
}
switch colname {
case "pid":
v, err := strconv.Atoi(row[i].String)
if err != nil {
continue
}
pid = v
case "query_duration":
v, err := strconv.ParseFloat(row[i].String, 64)
if err != nil {
continue
}
s.queryDurationSec = v
case "state_change_time":
s.changeStateTime = row[i].String
case "state":
s.state = row[i].String
case "wait_entry":
s.waitEntry = row[i].String
case "query":
s.queryText = row[i].String
}
}

// calculate ratios
for k, v := range s.durations {
s.ratios[k] = (100 * v) / curr.queryDurationSec
stat[pid] = s
}

return s
return stat
}

// Reset stats counters -- delete all entries from the maps
func resetCounters(s stats) stats {
for k := range s.durations {
delete(s.durations, k)
// countWaitEvents counts wait events durations and its percent rations accordingly to total query time.
func countWaitEvents(s stats, targetPid int, curr map[int]profileStat, prev map[int]profileStat) stats {
// Walk through current and previous activity snapshots and calculate durations
for k, vCurr := range curr {
if vPrev, ok := prev[k]; ok {
// found in prev
delta := vCurr.queryDurationSec - vPrev.queryDurationSec
s.accumulated += delta

if vCurr.waitEntry == "" {
s.durations["Running"] += delta
} else {
s.durations[vCurr.waitEntry] += delta
}
} else {
// new, not found in prev
s.accumulated += vCurr.queryDurationSec
if vCurr.waitEntry == "" {
s.durations["Running"] += vCurr.queryDurationSec
} else {
s.durations[vCurr.waitEntry] += vCurr.queryDurationSec
}
}
}
for k := range s.ratios {
delete(s.ratios, k)

// Update target PID execution duration
s.real = curr[targetPid].queryDurationSec

// Calculate ratios of wait_events accordingly to total time (including background workers)
for k, v := range s.durations {
s.ratios[k] = (100 * v) / s.accumulated
}

return s
}

// printHeader prints profile header.
// printHeader prints report header.
func printHeader(w io.Writer, curr profileStat, strsize int) error {
q := truncateQuery(curr.queryText, strsize)

Expand All @@ -219,13 +272,28 @@ func printHeader(w io.Writer, curr profileStat, strsize int) error {
return nil
}

// printStat prints collected wait events durations and percent ratios.
// waitEvent defines particular wait event and how many times it is occurred.
type waitEvent struct {
waitEventName string
waitEventValue float64
}

// waitEvents defines slice of waitEvent.
type waitEvents []waitEvent

// Implement sort methods for waitEvents.
func (p waitEvents) Len() int { return len(p) }
func (p waitEvents) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p waitEvents) Less(i, j int) bool { return p[i].waitEventValue > p[j].waitEventValue }

// printStat prints report body with collected wait events durations and percent ratios.
func printStat(w io.Writer, s stats) error {
if len(s.durations) == 0 {
return nil
} // nothing to do

var totalPct, totalTime float64
// Organize collected wait_events into slice for further sorting.
var totalPct float64
p := make(waitEvents, len(s.durations))
i := 0

Expand All @@ -234,17 +302,16 @@ func printStat(w io.Writer, s stats) error {
i++
}

// Sort wait events by percent ratios.
// Sort wait_events by percent ratios.
sort.Sort(p)

// Print stats and calculating totals.
// Print stats and calculate totals.
for _, e := range p {
_, err := fmt.Fprintf(w, "%*.2f %*.6f %s\n", 6, s.ratios[e.waitEventName], 12, e.waitEventValue, e.waitEventName)
if err != nil {
return err
}
totalPct += s.ratios[e.waitEventName]
totalTime += e.waitEventValue
}

// Print totals.
Expand All @@ -253,7 +320,7 @@ func printStat(w io.Writer, s stats) error {
return err
}

_, err = fmt.Fprintf(w, "%*.2f %*.6f\n", 6, totalPct, 12, totalTime)
_, err = fmt.Fprintf(w, "%*.2f %*.6f\n %*.6f including workers\n", 6, totalPct, 12, s.real, 12, s.accumulated)
if err != nil {
return err
}
Expand All @@ -262,9 +329,24 @@ func printStat(w io.Writer, s stats) error {
}

// truncateQuery truncates string if it's longer than limit and returns truncated copy of string.
func truncateQuery(q string, limit int) string {
if len(q) > limit {
return q[:limit]
func truncateQuery(s string, limit int) string {
if len(s) > limit {
return s[:limit]
}
return s
}

// selectQuery defines query used for profiling. Possible two kind of queries:
// - inclusive - selects background (parallel) workers using 'leader_pid' (available since Postgres 13)
// - exclusive - selects target PID only without background (parallel) workers.
func selectQuery(pid int, inclusive bool) string {
var query string

if inclusive {
query = fmt.Sprintf(inclusiveQuery, pid, pid)
} else {
query = fmt.Sprintf(exclusiveQuery, pid)
}
return q

return query
}
Loading

0 comments on commit 3d54c8d

Please sign in to comment.