Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgraded Prometheus to 2.11.1 and TSDB to 2.9.1 #1380

Merged
merged 1 commit into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@ We use *breaking* word for marking changes that are not backward compatible (rel
### Added

- [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type

- [#1363](https://github.com/thanos-io/thanos/pull/1363) Thanos Receive now exposes `thanos_receive_hashring_nodes` and `thanos_receive_hashring_tenants` metrics to monitor status of hash-rings

### Changed

- [#1380](https://github.com/thanos-io/thanos/pull/1380) Upgraded important dependencies: Prometheus to 2.11.1 and TSDB to 0.9.1. Some changes affecting Querier:
- [ENHANCEMENT] Query performance improvement: Efficient iteration and search in HashForLabels and HashWithoutLabels. #5707
- [ENHANCEMENT] Optimize queries using regexp for set lookups. tsdb#602
- [BUGFIX] prometheus_tsdb_compactions_failed_total is now incremented on any compaction failure. tsdb#613
- [BUGFIX] PromQL: Correctly display {__name__="a"}. #5552
- [#1338](https://github.com/thanos-io/thanos/pull/1338) Querier still warns on store API duplicate, but allows a single one from duplicated set. This is gracefully warn about the problematic logic and not disrupt immediately.
- [#1297](https://github.com/improbable-eng/thanos/pull/1297) Added `/-/ready` and `/-/healthy` endpoints to Thanos compact.

### Fixed

- [#1327](https://github.com/thanos-io/thanos/pull/1327) `/series` API end-point now properly returns an empty array just like Prometheus if there are no results

- [#1302](https://github.com/thanos-io/thanos/pull/1302) Thanos now efficiently reuses HTTP keep-alive connections

## [v0.6.0](https://github.com/thanos-io/thanos/releases/tag/v0.6.0) - 2019.07.18
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ require (
cloud.google.com/go v0.34.0
github.com/Azure/azure-storage-blob-go v0.7.0
github.com/NYTimes/gziphandler v1.1.1
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da
github.com/cespare/xxhash v1.1.0
github.com/fatih/structtag v1.0.0
Expand Down Expand Up @@ -33,8 +34,8 @@ require (
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.6.0
github.com/prometheus/prometheus v2.9.2+incompatible
github.com/prometheus/tsdb v0.8.0
github.com/prometheus/prometheus v0.0.0-20190710134608-e5b22494857d
github.com/prometheus/tsdb v0.9.1
github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible
Expand Down
124 changes: 78 additions & 46 deletions go.sum

Large diffs are not rendered by default.

84 changes: 17 additions & 67 deletions pkg/query/api/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import (
"math"
"net/http"
"strconv"
"sync"
"time"

"github.com/NYTimes/gziphandler"

"github.com/go-kit/kit/log"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -279,22 +277,12 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_instant_query")
defer span.Finish()

begin := api.now()
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter), r.FormValue("query"), ts)
qry, err := api.queryEngine.NewInstantQuery(api.queryableCreate(enableDedup, 0, enablePartialResponse), r.FormValue("query"), ts)
if err != nil {
return nil, nil, &ApiError{errorBadData, err}
}
Expand All @@ -316,7 +304,7 @@ func (api *API) query(r *http.Request) (interface{}, []error, *ApiError) {
return &queryData{
ResultType: res.Value.Type(),
Result: res.Value,
}, warnings, nil
}, res.Warnings, nil
}

func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
Expand Down Expand Up @@ -377,23 +365,13 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

// We are starting promQL tracing span here, because we have no control over promQL code.
span, ctx := tracing.StartSpan(ctx, "promql_range_query")
defer span.Finish()

begin := api.now()
qry, err := api.queryEngine.NewRangeQuery(
api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse, warningReporter),
api.queryableCreate(enableDedup, maxSourceResolution, enablePartialResponse),
r.FormValue("query"),
start,
end,
Expand All @@ -418,7 +396,7 @@ func (api *API) queryRange(r *http.Request) (interface{}, []error, *ApiError) {
return &queryData{
ResultType: res.Value.Type(),
Result: res.Value,
}, warnings, nil
}, res.Warnings, nil
}

func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
Expand All @@ -434,25 +412,15 @@ func (api *API) labelValues(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelValues")

// TODO(fabxc): add back request context.

vals, err := q.LabelValues(name)
vals, warnings, err := q.LabelValues(name)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down Expand Up @@ -515,42 +483,34 @@ func (api *API) series(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

// TODO(bwplotka): Support downsampling?
q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse, warningReporter).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
q, err := api.queryableCreate(enableDedup, 0, enablePartialResponse).Querier(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end))
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable series")

var sets []storage.SeriesSet
var (
warnings []error
metrics = []labels.Labels{}
sets []storage.SeriesSet
)
for _, mset := range matcherSets {
s, _, err := q.Select(&storage.SelectParams{}, mset...)
s, warns, err := q.Select(&storage.SelectParams{}, mset...)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
warnings = append(warnings, warns...)
sets = append(sets, s)
}

set := storage.NewMergeSeriesSet(sets, nil)

metrics := []labels.Labels{}
for set.Next() {
metrics = append(metrics, set.At().Labels())
}
if set.Err() != nil {
return nil, nil, &ApiError{errorExec, set.Err()}
}

return metrics, warnings, nil
}

Expand Down Expand Up @@ -627,23 +587,13 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, apiErr
}

var (
warnmtx sync.Mutex
warnings []error
)
warningReporter := func(err error) {
warnmtx.Lock()
warnings = append(warnings, err)
warnmtx.Unlock()
}

q, err := api.queryableCreate(true, 0, enablePartialResponse, warningReporter).Querier(ctx, math.MinInt64, math.MaxInt64)
q, err := api.queryableCreate(true, 0, enablePartialResponse).Querier(ctx, math.MinInt64, math.MaxInt64)
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
defer runutil.CloseWithLogOnErr(api.logger, q, "queryable labelNames")

names, err := q.LabelNames()
names, warnings, err := q.LabelNames()
if err != nil {
return nil, nil, &ApiError{errorExec, err}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/api/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
)

func testQueryableCreator(queryable storage.Queryable) query.QueryableCreator {
return func(_ bool, _ int64, _ bool, _ query.WarningReporter) storage.Queryable {
return func(_ bool, _ int64, _ bool) storage.Queryable {
return queryable
}
}
Expand Down
48 changes: 17 additions & 31 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,22 @@ import (
"github.com/thanos-io/thanos/pkg/tracing"
)

// WarningReporter allows to report warnings to frontend layer.
//
// Warning can include partial errors `partialResponse` is enabled. It occurs when only part of the results are ready and
// another is not available because of the failure.
// It is required to be thread-safe.
type WarningReporter func(error)

// QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints.
// If deduplication is enabled, all data retrieved from it will be deduplicated along the replicaLabel by default.
// maxResolutionMillis controls downsampling resolution that is allowed (specified in milliseconds).
// partialResponse controls `partialResponseDisabled` option of StoreAPI and partial response behaviour of proxy.
type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable
type QueryableCreator func(deduplicate bool, maxResolutionMillis int64, partialResponse bool) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
func NewQueryableCreator(logger log.Logger, proxy storepb.StoreServer, replicaLabel string) QueryableCreator {
return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool, r WarningReporter) storage.Queryable {
return func(deduplicate bool, maxResolutionMillis int64, partialResponse bool) storage.Queryable {
return &queryable{
logger: logger,
replicaLabel: replicaLabel,
proxy: proxy,
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
warningReporter: r,
}
}
}
Expand All @@ -48,12 +40,11 @@ type queryable struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
warningReporter WarningReporter
}

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse, q.warningReporter), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxResolutionMillis), q.partialResponse), nil
}

type querier struct {
Expand All @@ -66,7 +57,6 @@ type querier struct {
deduplicate bool
maxResolutionMillis int64
partialResponse bool
warningReporter WarningReporter
}

// newQuerier creates implementation of storage.Querier that fetches data from the proxy
Expand All @@ -80,14 +70,10 @@ func newQuerier(
deduplicate bool,
maxResolutionMillis int64,
partialResponse bool,
warningReporter WarningReporter,
) *querier {
if logger == nil {
logger = log.NewNopLogger()
}
if warningReporter == nil {
warningReporter = func(error) {}
}
ctx, cancel := context.WithCancel(ctx)
return &querier{
ctx: ctx,
Expand All @@ -100,7 +86,6 @@ func newQuerier(
deduplicate: deduplicate,
maxResolutionMillis: maxResolutionMillis,
partialResponse: partialResponse,
warningReporter: warningReporter,
}
}

Expand Down Expand Up @@ -191,10 +176,9 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
return nil, nil, errors.Wrap(err, "proxy Series()")
}

var warns storage.Warnings
for _, w := range resp.warnings {
// NOTE(bwplotka): We could use warnings return arguments here, however need reporter anyway for LabelValues and LabelNames method,
// so we choose to be consistent and keep reporter.
q.warningReporter(errors.New(w))
warns = append(warns, errors.New(w))
}

if !q.isDedupEnabled() {
Expand All @@ -204,7 +188,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
maxt: q.maxt,
set: newStoreSeriesSet(resp.seriesSet),
aggr: resAggr,
}, nil, nil
}, warns, nil
}

// TODO(fabxc): this could potentially pushed further down into the store API
Expand All @@ -221,7 +205,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s
// The merged series set assembles all potentially-overlapping time ranges
// of the same series into a single one. The series are ordered so that equal series
// from different replicas are sequential. We can now deduplicate those.
return newDedupSeriesSet(set, q.replicaLabel), nil, nil
return newDedupSeriesSet(set, q.replicaLabel), warns, nil
}

// sortDedupLabels resorts the set so that the same series with different replica
Expand All @@ -247,37 +231,39 @@ func sortDedupLabels(set []storepb.Series, replicaLabel string) {
}

// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(name string) ([]string, error) {
func (q *querier) LabelValues(name string) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelValues()")
return nil, nil, errors.Wrap(err, "proxy LabelValues()")
}

var warns storage.Warnings
for _, w := range resp.Warnings {
q.warningReporter(errors.New(w))
warns = append(warns, errors.New(w))
}

return resp.Values, nil
return resp.Values, warns, nil
}

// LabelNames returns all the unique label names present in the block in sorted order.
func (q *querier) LabelNames() ([]string, error) {
func (q *querier) LabelNames() ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_names")
defer span.Finish()

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelNames()")
return nil, nil, errors.Wrap(err, "proxy LabelNames()")
}

var warns storage.Warnings
for _, w := range resp.Warnings {
q.warningReporter(errors.New(w))
warns = append(warns, errors.New(w))
}

return resp.Names, nil
return resp.Names, warns, nil
}

func (q *querier) Close() error {
Expand Down
Loading