Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Implement rollup resolution decider for incoming queries.
Browse files Browse the repository at this point in the history
Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
  • Loading branch information
Harkishen-Singh committed Dec 6, 2022
1 parent ec7c1d5 commit e755d36
Show file tree
Hide file tree
Showing 19 changed files with 479 additions and 46 deletions.
15 changes: 10 additions & 5 deletions pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Client struct {
}

// NewClient creates a new PostgreSQL client
func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly bool) (*Client, error) {
func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, schemaLocker LockFunc, readOnly, useRollups bool) (*Client, error) {
var (
err error
dbMaxConns int
Expand Down Expand Up @@ -137,7 +137,7 @@ func NewClient(r prometheus.Registerer, cfg *Config, mt tenancy.Authorizer, sche
if err != nil {
return nil, fmt.Errorf("err creating reader connection pool: %w", err)
}
client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly)
client, err := NewClientWithPool(r, cfg, numCopiers, writerPool, readerPool, maintPool, mt, readOnly, useRollups)
if err != nil {
return client, err
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func getRedactedConnStr(s string) string {
}

// NewClientWithPool creates a new PostgreSQL client with an existing connection pool.
func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly bool) (*Client, error) {
func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, writerPool, readerPool, maintPool *pgxpool.Pool, mt tenancy.Authorizer, readOnly, useRollups bool) (*Client, error) {
sigClose := make(chan struct{})
metricsCache := cache.NewMetricCache(cfg.CacheConfig)
labelsCache := cache.NewLabelsCache(cfg.CacheConfig)
Expand All @@ -223,7 +223,12 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
exemplarKeyPosCache := cache.NewExemplarLabelsPosCache(cfg.CacheConfig)

labelsReader := lreader.NewLabelsReader(readerConn, labelsCache, mt.ReadAuthorizer())
dbQuerier := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer())
dbQuerier, err := querier.NewQuerier(readerConn, metricsCache, labelsReader, exemplarKeyPosCache, mt.ReadAuthorizer(), useRollups)
if err != nil {
log.Error("msg", "err starting querier", "error", err.Error())
return nil, err
}

queryable := query.NewQueryable(dbQuerier, labelsReader)

dbIngestor := ingestor.DBInserter(ingestor.ReadOnlyIngestor{})
Expand All @@ -232,7 +237,7 @@ func NewClientWithPool(r prometheus.Registerer, cfg *Config, numCopiers int, wri
writerConn = pgxconn.NewPgxConn(writerPool)
dbIngestor, err = ingestor.NewPgxIngestor(writerConn, metricsCache, seriesCache, exemplarKeyPosCache, &c)
if err != nil {
log.Error("msg", "err starting the ingestor", "err", err)
log.Error("msg", "err starting ingestor", "error", err.Error())
return nil, err
}
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/pgmodel/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ package querier

import (
"context"
"fmt"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/timescale/promscale/pkg/pgmodel/cache"
"github.com/timescale/promscale/pkg/pgmodel/lreader"
"github.com/timescale/promscale/pkg/pgxconn"
"github.com/timescale/promscale/pkg/rollup"
"github.com/timescale/promscale/pkg/tenancy"
)

type pgxQuerier struct {
tools *queryTools
tools *queryTools
rollups *rollup.QueryHelper
}

var _ Querier = (*pgxQuerier)(nil)
Expand All @@ -29,7 +32,8 @@ func NewQuerier(
labelsReader lreader.LabelsReader,
exemplarCache cache.PositionCache,
rAuth tenancy.ReadAuthorizer,
) Querier {
useRollups bool,
) (Querier, error) {
querier := &pgxQuerier{
tools: &queryTools{
conn: conn,
Expand All @@ -39,15 +43,22 @@ func NewQuerier(
rAuth: rAuth,
},
}
return querier
if useRollups {
qh, err := rollup.NewQueryHelper(context.Background(), conn)
if err != nil {
return nil, fmt.Errorf("creating rollups query helper: %w", err)
}
querier.rollups = qh
}
return querier, nil
}

func (q *pgxQuerier) RemoteReadQuerier(ctx context.Context) RemoteReadQuerier {
return newQueryRemoteRead(ctx, q)
}

func (q *pgxQuerier) SamplesQuerier(ctx context.Context) SamplesQuerier {
return newQuerySamples(ctx, q)
return newQuerySamples(ctx, q, q.rollups)
}

func (q *pgxQuerier) ExemplarsQuerier(ctx context.Context) ExemplarQuerier {
Expand Down
7 changes: 6 additions & 1 deletion pkg/pgmodel/querier/querier_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,12 @@ func TestPGXQuerierQuery(t *testing.T) {
if err != nil {
t.Fatalf("error setting up mock cache: %s", err.Error())
}
querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer())}}
querier := pgxQuerier{
&queryTools{
conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer()),
},
nil,
}

result, err := querier.RemoteReadQuerier(context.Background()).Query(c.query)

Expand Down
2 changes: 1 addition & 1 deletion pkg/pgmodel/querier/query_remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (q *queryRemoteRead) Query(query *prompb.Query) ([]*prompb.TimeSeries, erro
return nil, err
}

qrySamples := newQuerySamples(q.ctx, q.pgxQuerier)
qrySamples := newQuerySamples(q.ctx, q.pgxQuerier, nil)
sampleRows, _, err := qrySamples.fetchSamplesRows(query.StartTimestampMs, query.EndTimestampMs, nil, nil, nil, matchers)
if err != nil {
return nil, err
Expand Down
8 changes: 5 additions & 3 deletions pkg/pgmodel/querier/query_sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/timescale/promscale/pkg/pgmodel/common/errors"
"github.com/timescale/promscale/pkg/pgmodel/common/schema"
"github.com/timescale/promscale/pkg/rollup"
)

type querySamples struct {
*pgxQuerier
ctx context.Context
ctx context.Context
rollups *rollup.QueryHelper
}

func newQuerySamples(ctx context.Context, qr *pgxQuerier) *querySamples {
return &querySamples{qr, ctx}
func newQuerySamples(ctx context.Context, qr *pgxQuerier, rollups *rollup.QueryHelper) *querySamples {
return &querySamples{qr, ctx, rollups}
}

// Select implements the SamplesQuerier interface. It is the entry point for our
Expand Down
173 changes: 173 additions & 0 deletions pkg/rollup/resolution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// This file and its contents are licensed under the Apache License 2.0.
// Please see the included NOTICE for copyright information and
// LICENSE for a copy of the license.

package rollup

import (
"context"
"fmt"
"sort"
"sync"
"time"

"github.com/timescale/promscale/pkg/log"
"github.com/timescale/promscale/pkg/pgxconn"
)

const (
originalSchema = "prom_data"
upperLimit = 5000 // Maximum samples allowed
assumedScrapeInterval = time.Second * 30
refreshRollupResolution = time.Minute * 30
)

type rollupInfo struct {
schemaName string
resolution time.Duration
}

type QueryHelper struct {
conn pgxconn.PgxConn
refreshMtx sync.RWMutex
metricType map[string]string // metric_name: metric_type
resolutionsASC []rollupInfo // {schemaName, resolution} in ascending order of resolution.
downsamplingEnabled bool
}

func NewQueryHelper(ctx context.Context, conn pgxconn.PgxConn) (*QueryHelper, error) {
helper := &QueryHelper{conn: conn}
if err := helper.runRefreshRoutine(ctx, refreshRollupResolution); err != nil {
return nil, fmt.Errorf("refresh: %w", err)
}
return helper, nil
}

// DecideSchema returns the schema name of the rollups that should be used for querying.
// The returned schema represents a downsampled resolution that should be an optimal
// resolution for querying.
//
// If no rollups exists or if downsampling is disabled, "prom_data" is returned.
func (h *QueryHelper) DecideSchema(min, max int64) string {
h.refreshMtx.RLock()
defer h.refreshMtx.RUnlock()

if !h.downsamplingEnabled || len(h.resolutionsASC) == 0 {
return originalSchema
}
estimateSamples := func(resolution time.Duration) int64 {
return int64(float64(max-min) / resolution.Seconds())
}

numRawSamples := estimateSamples(assumedScrapeInterval)
if numRawSamples < upperLimit {
return originalSchema
}

for _, info := range h.resolutionsASC {
samples := estimateSamples(info.resolution)
if samples < upperLimit {
// The first highest resolution that is below upper limit is our answer,
// since it provides the highest granularity at the expected samples.
return info.schemaName
}
}
// All rollups are above upper limit. Hence, send the schema of the lowest resolution
// as this is the best we can do.
lowestRollup := h.resolutionsASC[len(h.resolutionsASC)-1]
return lowestRollup.schemaName
}

func (h *QueryHelper) ContainsMetricType(metricName string) bool {
_, present := h.metricType[metricName]
return present
}

func (h *QueryHelper) Refresh() error {
h.refreshMtx.Lock()
defer h.refreshMtx.Unlock()

if err := h.refreshDownsamplingState(); err != nil {
return fmt.Errorf("downsampling state: %w", err)
}
if err := h.refreshMetricTypes(); err != nil {
return fmt.Errorf("metric-type: %w", err)
}
if err := h.refreshRollupResolutions(); err != nil {
return fmt.Errorf("rollup resolutions: %w", err)
}
return nil
}

func (h *QueryHelper) runRefreshRoutine(ctx context.Context, refreshInterval time.Duration) error {
if err := h.Refresh(); err != nil {
return fmt.Errorf("refreshing rollup resolution: %w", err)
}
go func() {
t := time.NewTicker(refreshInterval)
defer t.Stop()
for range t.C {
if err := h.Refresh(); err != nil {
log.Error("msg", "error refreshing rollup resolution", "error", err.Error())
}
}
}()
return nil
}

func (h *QueryHelper) refreshDownsamplingState() error {
var state bool
if err := h.conn.QueryRow(context.Background(), "SELECT prom_api.get_automatic_downsample()::BOOLEAN").Scan(&state); err != nil {
return fmt.Errorf("fetching automatic downsampling state: %w", err)
}
h.downsamplingEnabled = state
return nil
}

func (h *QueryHelper) refreshMetricTypes() error {
var metricName, metricType []string
err := h.conn.QueryRow(context.Background(),
"select array_agg(metric_family), array_agg(type) from _prom_catalog.metadata").Scan(&metricName, &metricType)
if err != nil {
return fmt.Errorf("fetching metric metadata: %w", err)
}
h.metricType = make(map[string]string) // metric_name: metric_type
for i := range metricName {
h.metricType[metricName[i]] = metricType[i]
}
return nil
}

func (h *QueryHelper) refreshRollupResolutions() error {
rows, err := h.conn.Query(context.Background(), "SELECT schema_name, resolution FROM _prom_catalog.rollup")
if err != nil {
return fmt.Errorf("fetching rollup resolutions: %w", err)
}
h.resolutionsASC = []rollupInfo{}
for rows.Next() {
var (
schemaName string
resolution time.Duration
)
if err = rows.Scan(&schemaName, &resolution); err != nil {
return fmt.Errorf("error scanning rows: %w", err)
}
h.resolutionsASC = append(h.resolutionsASC, rollupInfo{schemaName: schemaName, resolution: resolution})
}
sort.Sort(sortRollupInfo(h.resolutionsASC))
return nil
}

type sortRollupInfo []rollupInfo

func (s sortRollupInfo) Len() int {
return len(s)
}

func (s sortRollupInfo) Less(i, j int) bool {
return s[i].resolution.Seconds() < s[j].resolution.Seconds()
}

func (s sortRollupInfo) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
Loading

0 comments on commit e755d36

Please sign in to comment.