diff --git a/CHANGELOG.md b/CHANGELOG.md index a9c10614ea..a356c8dae1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#3133](https://github.com/thanos-io/thanos/pull/3133) Query: Allow passing a `storeMatch[]` to Labels APIs. Also time range metadata based store filtering is supported on Labels APIs. - [#3154](https://github.com/thanos-io/thanos/pull/3154) Query Frontend: Add metric `thanos_memcached_getmulti_gate_queries_max`. +- [#3146](https://github.com/thanos-io/thanos/pull/3146) Sidecar: Add `thanos_sidecar_prometheus_store_received_frames` histogram metric. ### Changed diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 2a79022669..0a7842eed5 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -127,7 +127,7 @@ func runSidecar( }) lastHeartbeat := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ Name: "thanos_sidecar_last_heartbeat_success_time_seconds", - Help: "Second timestamp of the last successful heartbeat.", + Help: "Timestamp of the last successful heartbeat in seconds.", }) ctx, cancel := context.WithCancel(context.Background()) @@ -205,7 +205,7 @@ func runSidecar( t.MaxIdleConns = conf.connection.maxIdleConns c := promclient.NewClient(&http.Client{Transport: tracing.HTTPTripperware(logger, t)}, logger, thanoshttp.ThanosUserAgent) - promStore, err := store.NewPrometheusStore(logger, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps) + promStore, err := store.NewPrometheusStore(logger, reg, c, conf.prometheus.url, component.Sidecar, m.Labels, m.Timestamps) if err != nil { return errors.Wrap(err, "create Prometheus store") } diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index e937a40192..9617111122 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -23,6 +23,8 @@ import ( "github.com/golang/snappy" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -49,6 +51,8 @@ type PrometheusStore struct { timestamps func() (mint int64, maxt int64) remoteReadAcceptableResponses []prompb.ReadRequest_ResponseType + + framesRead prometheus.Histogram } const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size for sync pool size. @@ -58,6 +62,7 @@ const initialBufSize = 32 * 1024 // 32KB seems like a good minimum starting size // It attaches the provided external labels to all results. func NewPrometheusStore( logger log.Logger, + reg prometheus.Registerer, client *promclient.Client, baseURL *url.URL, component component.StoreAPI, @@ -79,6 +84,13 @@ func NewPrometheusStore( b := make([]byte, 0, initialBufSize) return &b }}, + framesRead: promauto.With(reg).NewHistogram( + prometheus.HistogramOpts{ + Name: "prometheus_store_received_frames", + Help: "Number of frames received per streamed response.", + Buckets: prometheus.ExponentialBuckets(10, 10, 5), + }, + ), } return p, nil } @@ -259,20 +271,16 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.") framesNum := 0 - seriesNum := 0 defer func() { + p.framesRead.Observe(float64(framesNum)) querySpan.SetTag("frames", framesNum) - querySpan.SetTag("series", seriesNum) querySpan.Finish() }() defer runutil.CloseWithLogOnErr(p.logger, httpResp.Body, "prom series request body") var ( - lastSeries string - currSeries string - tmp []string - data = p.getBuffer() + data = p.getBuffer() ) defer p.putBuffer(data) @@ -294,19 +302,6 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie framesNum++ for _, series := range res.ChunkedSeries { - { - // Calculate hash of series for counting. - tmp = tmp[:0] - for _, l := range series.Labels { - tmp = append(tmp, l.String()) - } - currSeries = strings.Join(tmp, ";") - if currSeries != lastSeries { - seriesNum++ - lastSeries = currSeries - } - } - thanosChks := make([]storepb.AggrChunk, len(series.Chunks)) for i, chk := range series.Chunks { thanosChks[i] = storepb.AggrChunk{ @@ -332,7 +327,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_Serie } } } - level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum, "series", seriesNum) + level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum) return nil } diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index dd853e9b90..784dbd19c5 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -63,7 +63,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { testutil.Ok(t, err) limitMinT := int64(0) - proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, + proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return limitMinT, -1 }) // Maxt does not matter. testutil.Ok(t, err) @@ -198,7 +198,7 @@ func TestPrometheusStore_SeriesLabels_e2e(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - promStore, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, + promStore, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return math.MinInt64/1000 + 62135596801, math.MaxInt64/1000 - 62135596801 }) testutil.Ok(t, err) @@ -376,7 +376,7 @@ func TestPrometheusStore_LabelNames_e2e(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil) + proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil) testutil.Ok(t, err) resp, err := proxy.LabelNames(ctx, &storepb.LabelNamesRequest{ @@ -421,7 +421,7 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil) + proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil) testutil.Ok(t, err) resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{ @@ -467,7 +467,7 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil) + proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, getExternalLabels, nil) testutil.Ok(t, err) resp, err := proxy.LabelValues(ctx, &storepb.LabelValuesRequest{ @@ -511,7 +511,7 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) { u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, + proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }) testutil.Ok(t, err) @@ -556,7 +556,7 @@ func TestPrometheusStore_Info(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), nil, component.Sidecar, + proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), nil, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 123, 456 }) testutil.Ok(t, err) @@ -634,7 +634,7 @@ func TestPrometheusStore_Series_SplitSamplesIntoChunksWithMaxSizeOf120(t *testin u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - proxy, err := NewPrometheusStore(nil, promclient.NewDefaultClient(), u, component.Sidecar, + proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, func() labels.Labels { return labels.FromStrings("region", "eu-west") }, func() (int64, int64) { return 0, math.MaxInt64 }) testutil.Ok(t, err)