Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Propagate context from query requests
Browse files Browse the repository at this point in the history
Each http.Request comes with its own context, in the case the client
connection closes this context is cancelled.

Propagating the request context prevents the request handler from doing
extra work in situations when the client is not going to be able to
receive the response.

For supporting context propagation, the same approach from the
Prometheus codebase was followed. Every `Queryable` method that returns
a `Querier` accepts a context, it's the `Querier` responsibility to
propagate it downstream.

- Prometheus `Queryable` interface definition:
https://github.com/prometheus/prometheus/blob/15fa34936b6f1febe896ecee0f49889a56347762/storage/interface.go#L79-L84

- Prometheus `Querier` propagating the ctx in the `Select` method:
https://github.com/prometheus/prometheus/blob/15fa34936b6f1febe896ecee0f49889a56347762/storage/remote/read.go#L170

Even though endpoints like `/api/v1/query` and `/api/v1/query_range` were
already propagating the context to downstream functions up to the
`SamplesQuerier` to handle the `timeout` parameter of the request, the
`Querier` was not using the given context in the calls to the `PgxConn`
methods, it was using `context.Background()` instead.

An important consideration to keep in mind: cancelling the
context on a request for a running query will abort the connection to the
database but not the query. To cancel the query we might have to use
something like `statement_timeout` as proposed by
#1232 .
  • Loading branch information
alejandrodnm committed Sep 5, 2022
1 parent 6f89768 commit 36cc120
Show file tree
Hide file tree
Showing 28 changed files with 144 additions and 129 deletions.
6 changes: 2 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ We use the following categories for changes:
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [Unreleased]

### Added

- prom-migrator: Support for passing custom HTTP headers via command line arguments for both
reader and writer [#1020].
- Run timescaledb-tune with the promscale profile [#1615]
- Propagate the context from received HTTP read requests downstream to database
requests [#1205].

## [0.14.0] - 2022-08-30

Expand All @@ -29,7 +28,6 @@ We use the following categories for changes:
- Helm chart now ships a JSON Schema for imposing a structure of the values.yaml file [#1551]

### Changed

- Helm chart code was migrated to https://github.com/timescale/helm-charts [#1562]
- Deprecate flag `tracing.otlp.server-address` in favour of `tracing.grpc.server-address` [#1588]

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func deleteHandler(config *Config, client *pgclient.Client) http.HandlerFunc {
continue
}
pgDelete := deletePkg.PgDelete{Conn: client.ReadOnlyConnection()}
touchedMetrics, deletedSeriesIDs, rowsDeleted, err := pgDelete.DeleteSeries(matchers, start, end)
touchedMetrics, deletedSeriesIDs, rowsDeleted, err := pgDelete.DeleteSeries(r.Context(), matchers, start, end)
if err != nil {
respondErrorWithMessage(w, http.StatusInternalServerError, err, "deleting_series",
fmt.Sprintf("partial delete: deleted %v series IDs from %v metrics, affecting %d rows in total.",
Expand Down
3 changes: 1 addition & 2 deletions pkg/api/label_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package api

import (
"context"
"fmt"
"math"
"net/http"
Expand All @@ -28,7 +27,7 @@ func labelValues(queryable promql.Queryable) http.HandlerFunc {
respondError(w, http.StatusBadRequest, fmt.Errorf("invalid label name: %s", name), "bad_data")
return
}
querier, err := queryable.SamplesQuerier(context.Background(), math.MinInt64, math.MaxInt64)
querier, err := queryable.SamplesQuerier(r.Context(), math.MinInt64, math.MaxInt64)
if err != nil {
respondError(w, http.StatusInternalServerError, err, "internal")
return
Expand Down
3 changes: 1 addition & 2 deletions pkg/api/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package api

import (
"context"
"encoding/json"
"math"
"net/http"
Expand Down Expand Up @@ -34,7 +33,7 @@ func Labels(conf *Config, queryable promql.Queryable) http.Handler {

func labelsHandler(queryable promql.Queryable) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
querier, err := queryable.SamplesQuerier(context.Background(), math.MinInt64, math.MaxInt64)
querier, err := queryable.SamplesQuerier(r.Context(), math.MinInt64, math.MaxInt64)
if err != nil {
respondError(w, http.StatusInternalServerError, err, "internal")
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func metricMetadataHandler(client *pgclient.Client) http.HandlerFunc {
return
}
}
data, err := metadata.MetricQuery(client.ReadOnlyConnection(), metric, int(limit))
data, err := metadata.MetricQuery(r.Context(), client.ReadOnlyConnection(), metric, int(limit))
if err != nil {
respondError(w, http.StatusInternalServerError, err, "fetching metric metadata")
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (m mockQuerier) Query(*prompb.Query) ([]*prompb.TimeSeries, error) {
panic("implement me")
}

func (m mockQuerier) SamplesQuerier() querier.SamplesQuerier {
func (m mockQuerier) SamplesQuerier(_ context.Context) querier.SamplesQuerier {
return m
}

Expand All @@ -70,7 +70,7 @@ func (ms mockQuerier) Select(int64, int64, bool, *storage.SelectHints, *querier.
return &mockSeriesSet{err: ms.selectErr}, nil
}

func (m mockQuerier) RemoteReadQuerier() querier.RemoteReadQuerier {
func (m mockQuerier) RemoteReadQuerier(_ context.Context) querier.RemoteReadQuerier {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/api/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func Read(config *Config, reader querier.Reader, metrics *Metrics, updateMetrics
metrics.RemoteReadReceivedQueries.Add(float64(len(req.Queries)))

// Drop __replica__ labelSet when
// Promscale is running is HA mode
// Promscale is running in HA mode
// as the same lebelSet is dropped during ingestion.
if config.HighAvailability {
for _, q := range req.Queries {
Expand All @@ -67,7 +67,7 @@ func Read(config *Config, reader querier.Reader, metrics *Metrics, updateMetrics
}

var resp *prompb.ReadResponse
resp, err = reader.Read(&req)
resp, err = reader.Read(r.Context(), &req)
if err != nil {
statusCode = "500"
log.Warn("msg", "Error executing query", "query", req, "storage", "PostgreSQL", "err", err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package api

import (
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -121,7 +122,7 @@ type mockReader struct {
err error
}

func (m *mockReader) Read(r *prompb.ReadRequest) (*prompb.ReadResponse, error) {
func (m *mockReader) Read(_ context.Context, r *prompb.ReadRequest) (*prompb.ReadResponse, error) {
m.request = r
return m.response, m.err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pgclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (c *Client) IngestTraces(ctx context.Context, tr ptrace.Traces) error {
}

// Read returns the promQL query results
func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
func (c *Client) Read(ctx context.Context, req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
if req == nil {
return nil, nil
}
Expand All @@ -305,7 +305,7 @@ func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
Results: make([]*prompb.QueryResult, len(req.Queries)),
}

qr := c.querier.RemoteReadQuerier()
qr := c.querier.RemoteReadQuerier(ctx)

for i, q := range req.Queries {
tts, err := qr.Query(q)
Expand Down
6 changes: 3 additions & 3 deletions pkg/pgclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type mockQuerier struct {

var _ querier.Querier = (*mockQuerier)(nil)

func (q *mockQuerier) SamplesQuerier() querier.SamplesQuerier {
func (q *mockQuerier) SamplesQuerier(_ context.Context) querier.SamplesQuerier {
return mockSamplesQuerier{}
}

Expand All @@ -36,7 +36,7 @@ func (q mockSamplesQuerier) Select(mint int64, maxt int64, sortSeries bool, hint
return nil, nil
}

func (q *mockQuerier) RemoteReadQuerier() querier.RemoteReadQuerier {
func (q *mockQuerier) RemoteReadQuerier(_ context.Context) querier.RemoteReadQuerier {
return mockRemoteReadQuerier{
tts: q.tts,
err: q.err,
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestDBReaderRead(t *testing.T) {

r := Client{querier: mq}

res, err := r.Read(c.req)
res, err := r.Read(context.Background(), c.req)

if err != nil {
if c.err == nil || err != c.err {
Expand Down
10 changes: 5 additions & 5 deletions pkg/pgmodel/delete/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ type PgDelete struct {
}

// DeleteSeries deletes the series that matches the provided label_matchers.
func (pgDel *PgDelete) DeleteSeries(matchers []*labels.Matcher, _, _ time.Time) ([]string, []model.SeriesID, int, error) {
func (pgDel *PgDelete) DeleteSeries(ctx context.Context, matchers []*labels.Matcher, _, _ time.Time) ([]string, []model.SeriesID, int, error) {
var (
deletedSeriesIDs []model.SeriesID
totalRowsDeleted int
err error
metricsTouched = make(map[string]struct{})
)
metricNames, seriesIDMatrix, err := getMetricNameSeriesIDFromMatchers(pgDel.Conn, matchers)
metricNames, seriesIDMatrix, err := getMetricNameSeriesIDFromMatchers(ctx, pgDel.Conn, matchers)
if err != nil {
return nil, nil, -1, fmt.Errorf("delete-series: %w", err)
}
for metricIndex, metricName := range metricNames {
seriesIDs := seriesIDMatrix[metricIndex]
var rowsDeleted int
if err = pgDel.Conn.QueryRow(
context.Background(),
ctx,
queryDeleteSeries,
metricName,
convertSeriesIDsToInt64s(seriesIDs),
Expand All @@ -57,7 +57,7 @@ func (pgDel *PgDelete) DeleteSeries(matchers []*labels.Matcher, _, _ time.Time)

// getMetricNameSeriesIDFromMatchers returns the metric name list and the corresponding series ID array
// as a matrix.
func getMetricNameSeriesIDFromMatchers(conn pgxconn.PgxConn, matchers []*labels.Matcher) ([]string, [][]model.SeriesID, error) {
func getMetricNameSeriesIDFromMatchers(ctx context.Context, conn pgxconn.PgxConn, matchers []*labels.Matcher) ([]string, [][]model.SeriesID, error) {
cb, err := querier.BuildSubQueries(matchers)
if err != nil {
return nil, nil, fmt.Errorf("delete series build subqueries: %w", err)
Expand All @@ -66,7 +66,7 @@ func getMetricNameSeriesIDFromMatchers(conn pgxconn.PgxConn, matchers []*labels.
if err != nil {
return nil, nil, fmt.Errorf("delete series build clauses: %w", err)
}
metrics, schemas, correspondingSeriesIDs, err := querier.GetMetricNameSeriesIds(conn, querier.GetMetadata(clauses, values))
metrics, schemas, correspondingSeriesIDs, err := querier.GetMetricNameSeriesIds(ctx, conn, querier.GetMetadata(clauses, values))
if err != nil {
return nil, nil, fmt.Errorf("get metric-name series-ids: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/pgmodel/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (
)

// MetricQuery returns metadata corresponding to metric or metric_family.
func MetricQuery(conn pgxconn.PgxConn, metric string, limit int) (map[string][]model.Metadata, error) {
func MetricQuery(ctx context.Context, conn pgxconn.PgxConn, metric string, limit int) (map[string][]model.Metadata, error) {
var (
rows pgxconn.PgxRows
err error
)
if metric != "" {
rows, err = conn.Query(context.Background(), "SELECT * from prom_api.get_metric_metadata($1)", metric)
rows, err = conn.Query(ctx, "SELECT * from prom_api.get_metric_metadata($1)", metric)
} else {
rows, err = conn.Query(context.Background(), "SELECT metric_family, type, unit, help from _prom_catalog.metadata ORDER BY metric_family, last_seen DESC")
rows, err = conn.Query(ctx, "SELECT metric_family, type, unit, help from _prom_catalog.metadata ORDER BY metric_family, last_seen DESC")
}
if err != nil {
return nil, fmt.Errorf("query metric metadata: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/pgmodel/querier/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ type QueryHints struct {
Lookback time.Duration
}

func GetMetricNameSeriesIds(conn pgxconn.PgxConn, metadata *evalMetadata) (metrics, schemas []string, correspondingSeriesIDs [][]model.SeriesID, err error) {
func GetMetricNameSeriesIds(ctx context.Context, conn pgxconn.PgxConn, metadata *evalMetadata) (metrics, schemas []string, correspondingSeriesIDs [][]model.SeriesID, err error) {
sqlQuery := buildMetricNameSeriesIDQuery(metadata.clauses)
rows, err := conn.Query(context.Background(), sqlQuery, metadata.values...)
rows, err := conn.Query(ctx, sqlQuery, metadata.values...)
if err != nil {
return nil, nil, nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/pgmodel/querier/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// Reader reads the data based on the provided read request.
type Reader interface {
Read(*prompb.ReadRequest) (*prompb.ReadResponse, error)
Read(context.Context, *prompb.ReadRequest) (*prompb.ReadResponse, error)
}

// SeriesSet adds a Close method to storage.SeriesSet to provide a way to free memory/
Expand All @@ -26,9 +26,9 @@ type SeriesSet interface {
// and exemplars.
type Querier interface {
// RemoteReadQuerier returns a remote storage querier
RemoteReadQuerier() RemoteReadQuerier
RemoteReadQuerier(ctx context.Context) RemoteReadQuerier
// SamplesQuerier returns a sample querier.
SamplesQuerier() SamplesQuerier
SamplesQuerier(ctx context.Context) SamplesQuerier
// ExemplarsQuerier returns an exemplar querier.
ExemplarsQuerier(ctx context.Context) ExemplarQuerier
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/pgmodel/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ func NewQuerier(
return querier
}

func (q *pgxQuerier) RemoteReadQuerier() RemoteReadQuerier {
return newQueryRemoteRead(q)
func (q *pgxQuerier) RemoteReadQuerier(ctx context.Context) RemoteReadQuerier {
return newQueryRemoteRead(ctx, q)
}

func (q *pgxQuerier) SamplesQuerier() SamplesQuerier {
return newQuerySamples(q)
func (q *pgxQuerier) SamplesQuerier(ctx context.Context) SamplesQuerier {
return newQuerySamples(ctx, q)
}

func (q *pgxQuerier) ExemplarsQuerier(ctx context.Context) ExemplarQuerier {
return newQueryExemplars(q)
return newQueryExemplars(ctx, q)
}

// errorSeriesSet represents an error result in a form of a series set.
Expand Down
3 changes: 2 additions & 1 deletion pkg/pgmodel/querier/querier_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package querier

import (
"context"
"fmt"
"reflect"
"testing"
Expand Down Expand Up @@ -723,7 +724,7 @@ func TestPGXQuerierQuery(t *testing.T) {
}
querier := pgxQuerier{&queryTools{conn: mock, metricTableNames: mockMetrics, labelsReader: lreader.NewLabelsReader(mock, clockcache.WithMax(0), tenancy.NewNoopAuthorizer().ReadAuthorizer())}}

result, err := querier.RemoteReadQuerier().Query(c.query)
result, err := querier.RemoteReadQuerier(context.Background()).Query(c.query)

if err != nil {
switch {
Expand Down
23 changes: 12 additions & 11 deletions pkg/pgmodel/querier/query_exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ import (

type queryExemplars struct {
*pgxQuerier
ctx context.Context
}

func newQueryExemplars(qr *pgxQuerier) *queryExemplars {
return &queryExemplars{qr}
func newQueryExemplars(ctx context.Context, qr *pgxQuerier) *queryExemplars {
return &queryExemplars{qr, ctx}
}

func (q *queryExemplars) Select(start, end time.Time, matchersList ...[]*labels.Matcher) ([]model.ExemplarQueryResult, error) {
Expand All @@ -40,7 +41,7 @@ func (q *queryExemplars) Select(start, end time.Time, matchersList ...[]*labels.
metadata.isExemplarQuery = true

if metadata.isSingleMetric {
metricInfo, err := q.tools.getMetricTableName("", metadata.metric, true)
metricInfo, err := q.tools.getMetricTableName(q.ctx, "", metadata.metric, true)
if err != nil {
if err == errors.ErrMissingTableName {
// The received metric does not have exemplars. Skip the remaining part and continue with
Expand All @@ -51,7 +52,7 @@ func (q *queryExemplars) Select(start, end time.Time, matchersList ...[]*labels.
}
metadata.timeFilter.metric = metricInfo.TableName

exemplarRows, err := fetchSingleMetricExemplars(q.tools, metadata)
exemplarRows, err := fetchSingleMetricExemplars(q.ctx, q.tools, metadata)
if err != nil {
return nil, fmt.Errorf("fetch single metric exemplars: %w", err)
}
Expand All @@ -66,7 +67,7 @@ func (q *queryExemplars) Select(start, end time.Time, matchersList ...[]*labels.
continue
}
// Multiple metric exemplar query.
exemplarRows, err := fetchMultipleMetricsExemplars(q.tools, metadata)
exemplarRows, err := fetchMultipleMetricsExemplars(q.ctx, q.tools, metadata)
if err != nil {
return nil, fmt.Errorf("fetch multiple metrics exemplars: %w", err)
}
Expand All @@ -86,10 +87,10 @@ func (q *queryExemplars) Select(start, end time.Time, matchersList ...[]*labels.

// fetchSingleMetricExemplars returns all exemplar rows for a single metric
// using the query metadata and tools.
func fetchSingleMetricExemplars(tools *queryTools, metadata *evalMetadata) ([]exemplarSeriesRow, error) {
func fetchSingleMetricExemplars(ctx context.Context, tools *queryTools, metadata *evalMetadata) ([]exemplarSeriesRow, error) {
sqlQuery := buildSingleMetricExemplarsQuery(metadata)

rows, err := tools.conn.Query(context.Background(), sqlQuery)
rows, err := tools.conn.Query(ctx, sqlQuery)
if err != nil {
// If we are getting undefined table error, it means the query
// is looking for a metric which doesn't exist in the system.
Expand All @@ -108,9 +109,9 @@ func fetchSingleMetricExemplars(tools *queryTools, metadata *evalMetadata) ([]ex

// fetchMultipleMetricsExemplars returns all the result rows across multiple metrics
// using the supplied query parameters.
func fetchMultipleMetricsExemplars(tools *queryTools, metadata *evalMetadata) ([]exemplarSeriesRow, error) {
func fetchMultipleMetricsExemplars(ctx context.Context, tools *queryTools, metadata *evalMetadata) ([]exemplarSeriesRow, error) {
// First fetch series IDs per metric.
metrics, _, correspondingSeriesIds, err := GetMetricNameSeriesIds(tools.conn, metadata)
metrics, _, correspondingSeriesIds, err := GetMetricNameSeriesIds(ctx, tools.conn, metadata)
if err != nil {
return nil, fmt.Errorf("get metric-name series-ids: %w", err)
}
Expand All @@ -126,7 +127,7 @@ func fetchMultipleMetricsExemplars(tools *queryTools, metadata *evalMetadata) ([
// Generate queries for each metric and send them in a single batch.
for i := range metrics {
//TODO batch getMetricTableName
metricInfo, err := tools.getMetricTableName("", metrics[i], true)
metricInfo, err := tools.getMetricTableName(ctx, "", metrics[i], true)
if err != nil {
if err == errors.ErrMissingTableName {
// If the metric table is missing, there are no results for this query.
Expand All @@ -148,7 +149,7 @@ func fetchMultipleMetricsExemplars(tools *queryTools, metadata *evalMetadata) ([
numQueries += 1
}

batchResults, err := tools.conn.SendBatch(context.Background(), batch)
batchResults, err := tools.conn.SendBatch(ctx, batch)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 36cc120

Please sign in to comment.