From fbffc4bfe26bba68f5d0b964132e881f3b142759 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20P=C5=82otka?= Date: Tue, 27 Aug 2019 17:45:54 +0100 Subject: [PATCH] sidecar: Added support for streaming, chunked remote read. (#1268) Fixes: https://github.com/improbable-eng/thanos/issues/488 Signed-off-by: Bartek Plotka --- CHANGELOG.md | 2 + pkg/query/iter.go | 47 +++++- pkg/query/querier.go | 4 +- pkg/query/querier_test.go | 9 +- pkg/store/prometheus.go | 206 ++++++++++++++++++------ pkg/store/prometheus_test.go | 14 +- pkg/store/proxy_test.go | 96 ++++++----- pkg/store/storepb/custom.go | 6 +- pkg/store/storepb/custom_test.go | 262 +++++++++++++++++++++++++++++++ pkg/store/storepb/rpc.pb.go | 10 ++ pkg/store/storepb/rpc.proto | 5 + 11 files changed, 555 insertions(+), 106 deletions(-) create mode 100644 pkg/store/storepb/custom_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6255347c92..f8a6fbe8fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added - [#1378](https://github.com/thanos-io/thanos/pull/1378) Thanos Receive now exposes `thanos_receive_config_hash`, `thanos_receive_config_last_reload_successful` and `thanos_receive_config_last_reload_success_timestamp_seconds` metrics to track latest configuration change +- [#1268](https://github.com/thanos-io/thanos/pull/1268) Added support for newest Prometheus streaming remote read added [here](https://github.com/prometheus/prometheus/pull/5703). This massively improves memory required by single + request for both Prometheus and sidecar. Single requests now should take constant amount of memory on sidecar, so resource consumption prediction is now straightforward. - [#1358](https://github.com/thanos-io/thanos/pull/1358) Added `part_size` configuration option for HTTP multipart requests minimum part size for S3 storage type - [#1363](https://github.com/thanos-io/thanos/pull/1363) Thanos Receive now exposes `thanos_receive_hashring_nodes` and `thanos_receive_hashring_tenants` metrics to monitor status of hash-rings diff --git a/pkg/query/iter.go b/pkg/query/iter.go index 7b23f2f931..589da98e10 100644 --- a/pkg/query/iter.go +++ b/pkg/query/iter.go @@ -15,17 +15,52 @@ import ( // promSeriesSet implements the SeriesSet interface of the Prometheus storage // package on top of our storepb SeriesSet. type promSeriesSet struct { - set storepb.SeriesSet + set storepb.SeriesSet + initiated bool + done bool + mint, maxt int64 aggr resAggr + + currLset []storepb.Label + currChunks []storepb.AggrChunk } -func (s promSeriesSet) Next() bool { return s.set.Next() } -func (s promSeriesSet) Err() error { return s.set.Err() } +func (s *promSeriesSet) Next() bool { + if !s.initiated { + s.initiated = true + s.done = s.set.Next() + } -func (s promSeriesSet) At() storage.Series { - lset, chunks := s.set.At() - return newChunkSeries(lset, chunks, s.mint, s.maxt, s.aggr) + if !s.done { + return false + } + + // storage.Series are more strict then SeriesSet: It requires storage.Series to iterate over full series. + s.currLset, s.currChunks = s.set.At() + for { + s.done = s.set.Next() + if !s.done { + break + } + nextLset, nextChunks := s.set.At() + if storepb.CompareLabels(s.currLset, nextLset) != 0 { + break + } + s.currChunks = append(s.currChunks, nextChunks...) + } + return true +} + +func (s *promSeriesSet) At() storage.Series { + if !s.initiated || s.set.Err() != nil { + return nil + } + return newChunkSeries(s.currLset, s.currChunks, s.mint, s.maxt, s.aggr) +} + +func (s *promSeriesSet) Err() error { + return s.set.Err() } func translateMatcher(m *labels.Matcher) (storepb.LabelMatcher, error) { diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 7ee7322414..aefe9a2ebe 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -183,7 +183,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s if !q.isDedupEnabled() { // Return data without any deduplication. - return promSeriesSet{ + return &promSeriesSet{ mint: q.mint, maxt: q.maxt, set: newStoreSeriesSet(resp.seriesSet), @@ -195,7 +195,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s // to make true streaming possible. sortDedupLabels(resp.seriesSet, q.replicaLabel) - set := promSeriesSet{ + set := &promSeriesSet{ mint: q.mint, maxt: q.maxt, set: newStoreSeriesSet(resp.seriesSet), diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 537e3569de..a5a280f83c 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -163,8 +163,12 @@ func TestQuerier_Series(t *testing.T) { testProxy := &storeServer{ resps: []*storepb.SeriesResponse{ + // Expected sorted series per seriesSet input. However we Series API allows for single series being chunks across multiple frames. + // This should be handled here. storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), storepb.NewWarnSeriesResponse(errors.New("partial error")), + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{5, 5}, {6, 6}, {7, 7}}), + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{5, 5}, {6, 66}}), // Overlap samples for some reason. storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{2, 2}, {3, 3}, {4, 4}}, []sample{{1, 1}, {2, 2}, {3, 3}}), storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{100, 1}, {300, 3}, {400, 4}}), }, @@ -184,7 +188,7 @@ func TestQuerier_Series(t *testing.T) { }{ { lset: labels.FromStrings("a", "a"), - samples: []sample{{2, 1}, {3, 2}}, + samples: []sample{{2, 1}, {3, 2}, {5, 5}, {6, 6}, {7, 7}}, }, { lset: labels.FromStrings("a", "b"), @@ -349,7 +353,7 @@ func TestDedupSeriesSet(t *testing.T) { }, }) } - set := promSeriesSet{ + set := &promSeriesSet{ mint: 1, maxt: math.MaxInt64, set: newStoreSeriesSet(series), @@ -521,6 +525,7 @@ func (s *storeServer) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesS return nil } +// storeSeriesResponse creates test storepb.SeriesResponse that includes series with single chunk that stores all the given samples. func storeSeriesResponse(t testing.TB, lset labels.Labels, smplChunks ...[]sample) *storepb.SeriesResponse { var s storepb.Series diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 58313033ba..66460d1b49 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -12,14 +12,17 @@ import ( "net/url" "path" "sort" + "strings" "sync" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/tsdb/labels" "github.com/thanos-io/thanos/pkg/component" @@ -80,7 +83,7 @@ func NewPrometheusStore( } // Info returns store information about the Prometheus instance. -// NOTE(bplotka): MaxTime & MinTime are not accurate nor adjusted dynamically. +// NOTE(bwplotka): MaxTime & MinTime are not accurate nor adjusted dynamically. // This is fine for now, but might be needed in future. func (p *PrometheusStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) { lset := p.externalLabels() @@ -142,8 +145,6 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime} - // TODO(fabxc): import common definitions from prompb once we have a stable gRPC - // query API there. for _, m := range newMatchers { pm := &prompb.LabelMatcher{Name: m.Name, Value: m.Value} @@ -162,12 +163,39 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie q.Matchers = append(q.Matchers, pm) } - resp, err := p.promSeries(s.Context(), q) + queryPrometheusSpan, ctx := tracing.StartSpan(s.Context(), "query_prometheus") + + httpResp, err := p.startPromSeries(ctx, q) if err != nil { + queryPrometheusSpan.Finish() return errors.Wrap(err, "query Prometheus") } - span, _ := tracing.StartSpan(s.Context(), "transform_and_respond") + // Negotiate content. We requested streamed chunked response type, but still we need to support old versions of + // remote read. + contentType := httpResp.Header.Get("Content-Type") + if strings.HasPrefix(contentType, "application/x-protobuf") { + return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, externalLabels) + } + + if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") { + return errors.Errorf("not supported remote read content type: %s", contentType) + } + return p.handleStreamedPrometheusResponse(s, httpResp, queryPrometheusSpan, externalLabels) +} + +func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan opentracing.Span, externalLabels labels.Labels) error { + ctx := s.Context() + + level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.") + + resp, err := p.fetchSampledResponse(ctx, httpResp) + querySpan.Finish() + if err != nil { + return err + } + + span, _ := tracing.StartSpan(ctx, "transform_and_respond") defer span.Finish() span.SetTag("series_count", len(resp.Results[0].Timeseries)) @@ -195,17 +223,129 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie return err } - resp := storepb.NewSeriesResponse(&storepb.Series{ + if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{ Labels: lset, Chunks: aggregatedChunks, - }) - if err := s.Send(resp); err != nil { + })); err != nil { return err } } + level.Debug(p.logger).Log("msg", "handled ReadRequest_SAMPLED request.", "series", len(resp.Results[0].Timeseries)) return nil } +func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan opentracing.Span, externalLabels labels.Labels) error { + level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.") + + framesNum := 0 + seriesNum := 0 + + defer func() { + querySpan.SetTag("frames", framesNum) + querySpan.SetTag("series", seriesNum) + querySpan.Finish() + }() + defer runutil.CloseWithLogOnErr(p.logger, httpResp.Body, "prom series request body") + + var ( + lastSeries string + currSeries string + tmp []string + data = p.getBuffer() + ) + defer p.putBuffer(data) + + // TODO(bwplotka): Put read limit as a flag. + stream := remote.NewChunkedReader(httpResp.Body, remote.DefaultChunkedReadLimit, *data) + for { + res := &prompb.ChunkedReadResponse{} + err := stream.NextProto(res) + if err == io.EOF { + break + } + if err != nil { + return errors.Wrap(err, "next proto") + } + + if len(res.ChunkedSeries) != 1 { + level.Warn(p.logger).Log("msg", "Prometheus ReadRequest_STREAMED_XOR_CHUNKS returned non 1 series in frame", "series", len(res.ChunkedSeries)) + } + + framesNum++ + for _, series := range res.ChunkedSeries { + { + // Calculate hash of series for counting. + tmp = tmp[:0] + for _, l := range series.Labels { + tmp = append(tmp, l.String()) + } + currSeries = strings.Join(tmp, ";") + if currSeries != lastSeries { + seriesNum++ + lastSeries = currSeries + } + } + + thanosChks := make([]storepb.AggrChunk, len(series.Chunks)) + for i, chk := range series.Chunks { + thanosChks[i] = storepb.AggrChunk{ + MaxTime: chk.MaxTimeMs, + MinTime: chk.MinTimeMs, + Raw: &storepb.Chunk{ + Data: chk.Data, + // Prometheus ChunkEncoding vs ours https://github.com/thanos-io/thanos/blob/master/pkg/store/storepb/types.proto#L19 + // has one difference. Prometheus has Chunk_UNKNOWN Chunk_Encoding = 0 vs we start from + // XOR as 0. Compensate for that here: + Type: storepb.Chunk_Encoding(chk.Type - 1), + }, + } + // Drop the reference to data from non protobuf for GC. + series.Chunks[i].Data = nil + } + + if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{ + Labels: p.translateAndExtendLabels(series.Labels, externalLabels), + Chunks: thanosChks, + })); err != nil { + return err + } + } + } + level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum, "series", seriesNum) + return nil +} + +func (p *PrometheusStore) fetchSampledResponse(ctx context.Context, resp *http.Response) (*prompb.ReadResponse, error) { + defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "prom series request body") + + b := p.getBuffer() + buf := bytes.NewBuffer(*b) + defer p.putBuffer(b) + if _, err := io.Copy(buf, resp.Body); err != nil { + return nil, errors.Wrap(err, "copy response") + } + spanSnappyDecode, ctx := tracing.StartSpan(ctx, "decompress_response") + sb := p.getBuffer() + decomp, err := snappy.Decode(*sb, buf.Bytes()) + spanSnappyDecode.Finish() + defer p.putBuffer(sb) + if err != nil { + return nil, errors.Wrap(err, "decompress response") + } + + var data prompb.ReadResponse + spanUnmarshal, _ := tracing.StartSpan(ctx, "unmarshal_response") + if err := proto.Unmarshal(decomp, &data); err != nil { + return nil, errors.Wrap(err, "unmarshal response") + } + spanUnmarshal.Finish() + if len(data.Results) != 1 { + return nil, errors.Errorf("unexpected result size %d", len(data.Results)) + } + + return &data, nil +} + func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerChunk int) (chks []storepb.AggrChunk, err error) { samples := series.Samples @@ -232,11 +372,11 @@ func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerC return chks, nil } -func (p *PrometheusStore) promSeries(ctx context.Context, q *prompb.Query) (*prompb.ReadResponse, error) { - span, ctx := tracing.StartSpan(ctx, "query_prometheus") - defer span.Finish() - - reqb, err := proto.Marshal(&prompb.ReadRequest{Queries: []*prompb.Query{q}}) +func (p *PrometheusStore) startPromSeries(ctx context.Context, q *prompb.Query) (*http.Response, error) { + reqb, err := proto.Marshal(&prompb.ReadRequest{ + Queries: []*prompb.Query{q}, + AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS}, + }) if err != nil { return nil, errors.Wrap(err, "marshal read request") } @@ -249,8 +389,7 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q *prompb.Query) (*pro return nil, errors.Wrap(err, "unable to create request") } preq.Header.Add("Content-Encoding", "snappy") - preq.Header.Set("Content-Type", "application/x-protobuf") - preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") + preq.Header.Set("Content-Type", "application/x-stream-protobuf") spanReqDo, ctx := tracing.StartSpan(ctx, "query_prometheus_request") preq = preq.WithContext(ctx) presp, err := p.client.Do(preq) @@ -258,38 +397,17 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q *prompb.Query) (*pro return nil, errors.Wrap(err, "send request") } spanReqDo.Finish() - defer runutil.ExhaustCloseWithLogOnErr(p.logger, presp.Body, "prom series request body") - if presp.StatusCode/100 != 2 { - return nil, errors.Errorf("request failed with code %s", presp.Status) - } - - c := p.getBuffer() - buf := bytes.NewBuffer(*c) - defer p.putBuffer(c) - if _, err := io.Copy(buf, presp.Body); err != nil { - return nil, errors.Wrap(err, "copy response") - } - - spanSnappyDecode, ctx := tracing.StartSpan(ctx, "decompress_response") - sc := p.getBuffer() - decomp, err := snappy.Decode(*sc, buf.Bytes()) - spanSnappyDecode.Finish() - defer p.putBuffer(sc) - if err != nil { - return nil, errors.Wrap(err, "decompress response") + // Best effort read. + b, err := ioutil.ReadAll(presp.Body) + if err != nil { + level.Error(p.logger).Log("msg", "failed to read response from non 2XX remote read request", "err", err) + } + _ = presp.Body.Close() + return nil, errors.Errorf("request failed with code %s; msg %s", presp.Status, string(b)) } - var data prompb.ReadResponse - spanUnmarshal, _ := tracing.StartSpan(ctx, "unmarshal_response") - if err := proto.Unmarshal(decomp, &data); err != nil { - return nil, errors.Wrap(err, "unmarshal response") - } - spanUnmarshal.Finish() - if len(data.Results) != 1 { - return nil, errors.Errorf("unexpected result size %d", len(data.Results)) - } - return &data, nil + return presp, nil } // matchesExternalLabels filters out external labels matching from matcher if exsits as the local storage does not have them. diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index a2d47fd42c..d8bb3ed634 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -34,6 +34,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { p, err := testutil.NewPrometheusOnPath(prefix) testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() baseT := timestamp.FromTime(time.Now()) / 1000 * 1000 @@ -50,7 +51,6 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { defer cancel() testutil.Ok(t, p.Start()) - defer func() { testutil.Ok(t, p.Stop()) }() u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) @@ -65,15 +65,13 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { // Query all three samples except for the first one. Since we round up queried data // to seconds, we can test whether the extra sample gets stripped properly. srv := newStoreSeriesServer(ctx) - - err = proxy.Series(&storepb.SeriesRequest{ + testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ MinTime: baseT + 101, MaxTime: baseT + 300, Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, }, - }, srv) - testutil.Ok(t, err) + }, srv)) testutil.Equals(t, 1, len(srv.SeriesSet)) @@ -134,6 +132,7 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) { p, err := testutil.NewPrometheus() testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() a := p.Appender() _, err = a.Add(labels.FromStrings("a", "b"), 0, 1) @@ -148,7 +147,6 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) { defer cancel() testutil.Ok(t, p.Start()) - defer func() { testutil.Ok(t, p.Stop()) }() u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) @@ -170,6 +168,7 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) { p, err := testutil.NewPrometheus() testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() a := p.Appender() _, err = a.Add(labels.FromStrings("ext_a", "b"), 0, 1) @@ -182,7 +181,6 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) { defer cancel() testutil.Ok(t, p.Start()) - defer func() { testutil.Ok(t, p.Stop()) }() u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) @@ -210,6 +208,7 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) { p, err := testutil.NewPrometheus() testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() baseT := timestamp.FromTime(time.Now()) / 1000 * 1000 @@ -226,7 +225,6 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) { defer cancel() testutil.Ok(t, p.Start()) - defer func() { testutil.Ok(t, p.Stop()) }() u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index de7bc45f5b..5aaa1a51de 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -152,8 +152,8 @@ func TestProxyStore_Series(t *testing.T) { }, expectedSeries: []rawSeries{ { - lset: []storepb.Label{{Name: "a", Value: "a"}}, - samples: []sample{{0, 0}, {2, 1}, {3, 2}}, + lset: []storepb.Label{{Name: "a", Value: "a"}}, + chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}}, }, }, }, @@ -163,7 +163,7 @@ func TestProxyStore_Series(t *testing.T) { &testClient{ StoreClient: &mockedStoreAPI{ RespSeries: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{4, 3}}, []sample{{0, 0}, {2, 1}, {3, 2}}), }, }, minTime: 1, @@ -177,8 +177,8 @@ func TestProxyStore_Series(t *testing.T) { }, expectedSeries: []rawSeries{ { - lset: []storepb.Label{{Name: "a", Value: "a"}}, - samples: []sample{{0, 0}, {2, 1}, {3, 2}}, + lset: []storepb.Label{{Name: "a", Value: "a"}}, + chunks: [][]sample{{{4, 3}}, {{0, 0}, {2, 1}, {3, 2}}}, // No sort merge. }, }, }, @@ -223,8 +223,8 @@ func TestProxyStore_Series(t *testing.T) { expectedSeries: []rawSeries{ { // We did not ask for a=a, but we trust StoreAPI will match correctly, so proxy does check any of this. - lset: []storepb.Label{{Name: "a", Value: "a"}}, - samples: []sample{{0, 0}, {2, 1}, {3, 2}}, + lset: []storepb.Label{{Name: "a", Value: "a"}}, + chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}}, }, }, }, @@ -234,7 +234,8 @@ func TestProxyStore_Series(t *testing.T) { &testClient{ StoreClient: &mockedStoreAPI{ RespSeries: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}, []sample{{4, 3}}), + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{5, 4}}), // Continuations of the same series. storepb.NewWarnSeriesResponse(errors.New("warning")), storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{2, 2}, {3, 3}, {4, 4}}), }, @@ -287,16 +288,20 @@ func TestProxyStore_Series(t *testing.T) { }, expectedSeries: []rawSeries{ { - lset: []storepb.Label{{Name: "a", Value: "a"}}, - samples: []sample{{0, 0}, {2, 1}, {3, 2}}, + lset: []storepb.Label{{Name: "a", Value: "a"}}, + chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}, {{4, 3}}}, + }, + { + lset: []storepb.Label{{Name: "a", Value: "a"}}, + chunks: [][]sample{{{5, 4}}}, }, { - lset: []storepb.Label{{Name: "a", Value: "b"}}, - samples: []sample{{2, 2}, {3, 3}, {4, 4}, {1, 1}, {2, 2}, {3, 3}}, // No sort merge. + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{2, 2}, {3, 3}, {4, 4}}, {{1, 1}, {2, 2}, {3, 3}}}, // No sort merge. }, { - lset: []storepb.Label{{Name: "a", Value: "c"}}, - samples: []sample{{100, 1}, {300, 3}, {400, 4}}, + lset: []storepb.Label{{Name: "a", Value: "c"}}, + chunks: [][]sample{{{100, 1}, {300, 3}, {400, 4}}}, }, }, expectedWarningsLen: 2, @@ -332,8 +337,8 @@ func TestProxyStore_Series(t *testing.T) { }, expectedSeries: []rawSeries{ { - lset: []storepb.Label{{Name: "a", Value: "b"}}, - samples: []sample{{1, 1}, {2, 2}, {3, 3}, {1, 11}, {2, 22}, {3, 33}}, + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}, {{1, 11}, {2, 22}, {3, 33}}}, }, }, }, @@ -367,8 +372,8 @@ func TestProxyStore_Series(t *testing.T) { }, expectedSeries: []rawSeries{ { - lset: []storepb.Label{{Name: "a", Value: "b"}}, - samples: []sample{{1, 1}, {2, 2}, {3, 3}}, + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, }, }, expectedWarningsLen: 2, @@ -425,7 +430,7 @@ func TestProxyStore_Series(t *testing.T) { testutil.Ok(t, err) - seriesEqual(t, tc.expectedSeries, s.SeriesSet) + seriesEquals(t, tc.expectedSeries, s.SeriesSet) testutil.Equals(t, tc.expectedWarningsLen, len(s.Warnings), "got %v", s.Warnings) }); !ok { return @@ -521,8 +526,8 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { }, expectedSeries: []rawSeries{ { - lset: []storepb.Label{{Name: "a", Value: "b"}}, - samples: []sample{{1, 1}, {2, 2}, {3, 3}}, + lset: []storepb.Label{{Name: "a", Value: "b"}}, + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, }, }, expectedWarningsLen: 2, @@ -547,7 +552,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { testutil.Ok(t, err) - seriesEqual(t, tc.expectedSeries, s.SeriesSet) + seriesEquals(t, tc.expectedSeries, s.SeriesSet) testutil.Equals(t, tc.expectedWarningsLen, len(s.Warnings), "got %v", s.Warnings) }); !ok { return @@ -796,32 +801,33 @@ func TestProxyStore_LabelNames(t *testing.T) { } type rawSeries struct { - lset []storepb.Label - samples []sample + lset []storepb.Label + chunks [][]sample } -func seriesEqual(t *testing.T, expected []rawSeries, got []storepb.Series) { +func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) { testutil.Equals(t, len(expected), len(got), "got: %v", got) for i, series := range got { testutil.Equals(t, expected[i].lset, series.Labels) + testutil.Equals(t, len(expected[i].chunks), len(series.Chunks), "unexpected number of chunks for series %v", series.Labels) - k := 0 - for _, chk := range series.Chunks { + for k, chk := range series.Chunks { c, err := chunkenc.FromData(chunkenc.EncXOR, chk.Raw.Data) testutil.Ok(t, err) + j := 0 iter := c.Iterator(nil) for iter.Next() { - testutil.Assert(t, k < len(expected[i].samples), "more samples than expected") + testutil.Assert(t, j < len(expected[i].chunks[k]), "more samples than expected for %v chunk %d", series.Labels, k) tv, v := iter.At() - testutil.Equals(t, expected[i].samples[k], sample{tv, v}) - k++ + testutil.Equals(t, expected[i].chunks[k][j], sample{tv, v}) + j++ } testutil.Ok(t, iter.Err()) + testutil.Equals(t, len(expected[i].chunks[k]), j) } - testutil.Equals(t, len(expected[i].samples), k) } } @@ -1029,24 +1035,30 @@ func (c *StoreSeriesClient) Context() context.Context { } // storeSeriesResponse creates test storepb.SeriesResponse that includes series with single chunk that stores all the given samples. -func storeSeriesResponse(t testing.TB, lset labels.Labels, smpls []sample) *storepb.SeriesResponse { +func storeSeriesResponse(t testing.TB, lset labels.Labels, smplChunks ...[]sample) *storepb.SeriesResponse { var s storepb.Series for _, l := range lset { s.Labels = append(s.Labels, storepb.Label{Name: l.Name, Value: l.Value}) } - c := chunkenc.NewXORChunk() - a, err := c.Appender() - testutil.Ok(t, err) - for _, smpl := range smpls { - a.Append(smpl.t, smpl.v) + for _, smpls := range smplChunks { + c := chunkenc.NewXORChunk() + a, err := c.Appender() + testutil.Ok(t, err) + + for _, smpl := range smpls { + a.Append(smpl.t, smpl.v) + } + + ch := storepb.AggrChunk{ + MinTime: smpls[0].t, + MaxTime: smpls[len(smpls)-1].t, + Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, + } + + s.Chunks = append(s.Chunks, ch) } - s.Chunks = append(s.Chunks, storepb.AggrChunk{ - MinTime: smpls[0].t, - MaxTime: smpls[len(smpls)-1].t, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, - }) return storepb.NewSeriesResponse(&s) } diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index ce6ccb8659..c942dd1355 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -76,7 +76,7 @@ func MergeSeriesSets(all ...SeriesSet) SeriesSet { } // SeriesSet is a set of series and their corresponding chunks. -// The set is sorted by the label sets. Chunks may be overlapping or out of order. +// The set is sorted by the label sets. Chunks may be overlapping or expected of order. type SeriesSet interface { Next() bool At() ([]Label, []AggrChunk) @@ -95,6 +95,8 @@ type mergedSeriesSet struct { // newMergedSeriesSet takes two series sets as a single series set. // Series that occur in both sets should have disjoint time ranges. // If the ranges overlap b samples are appended to a samples. +// If the single SeriesSet returns same series within many iterations, +// merge series set will not try to merge those. func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet { s := &mergedSeriesSet{a: a, b: b} // Initialize first elements of both sets as Next() needs @@ -143,7 +145,7 @@ func (s *mergedSeriesSet) Next() bool { s.lset, s.chunks = s.a.At() s.adone = !s.a.Next() } else { - // Concatenate chunks from both series sets. They may be out of order + // Concatenate chunks from both series sets. They may be expected of order // w.r.t to their time range. This must be accounted for later. lset, chksA := s.a.At() _, chksB := s.b.At() diff --git a/pkg/store/storepb/custom_test.go b/pkg/store/storepb/custom_test.go new file mode 100644 index 0000000000..1b4f05eec3 --- /dev/null +++ b/pkg/store/storepb/custom_test.go @@ -0,0 +1,262 @@ +package storepb + +import ( + "errors" + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/thanos-io/thanos/pkg/testutil" +) + +type sample struct { + t int64 + v float64 +} + +type listSeriesSet struct { + series []Series + idx int +} + +func newSeries(t *testing.T, lset labels.Labels, smplChunks [][]sample) Series { + var s Series + + for _, l := range lset { + s.Labels = append(s.Labels, Label{Name: l.Name, Value: l.Value}) + } + + for _, smpls := range smplChunks { + c := chunkenc.NewXORChunk() + a, err := c.Appender() + testutil.Ok(t, err) + + for _, smpl := range smpls { + a.Append(smpl.t, smpl.v) + } + + ch := AggrChunk{ + MinTime: smpls[0].t, + MaxTime: smpls[len(smpls)-1].t, + Raw: &Chunk{Type: Chunk_XOR, Data: c.Bytes()}, + } + + s.Chunks = append(s.Chunks, ch) + } + return s +} + +func newListSeriesSet(t *testing.T, raw []rawSeries) *listSeriesSet { + var series []Series + for _, s := range raw { + series = append(series, newSeries(t, s.lset, s.chunks)) + } + return &listSeriesSet{ + series: series, + idx: -1, + } +} + +func (s *listSeriesSet) Next() bool { + s.idx++ + return s.idx < len(s.series) +} + +func (s *listSeriesSet) At() ([]Label, []AggrChunk) { + if s.idx < 0 || s.idx >= len(s.series) { + return nil, nil + } + + return s.series[s.idx].Labels, s.series[s.idx].Chunks +} + +func (s *listSeriesSet) Err() error { return nil } + +type errSeriesSet struct{ err error } + +func (errSeriesSet) Next() bool { return false } + +func (errSeriesSet) At() ([]Label, []AggrChunk) { return nil, nil } + +func (e errSeriesSet) Err() error { return e.err } + +func TestMergeSeriesSet(t *testing.T) { + for _, tcase := range []struct { + desc string + in [][]rawSeries + expected []rawSeries + }{ + { + desc: "nils", + in: nil, + expected: nil, + }, + { + desc: "single seriesSet, distinct series", + in: [][]rawSeries{{{ + lset: labels.FromStrings("a", "a"), + chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}}, + }, { + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}}, + }}}, + + expected: []rawSeries{ + { + lset: labels.FromStrings("a", "a"), + chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}}, + }, { + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}}, + }, + }, + }, + { + desc: "two seriesSets, distinct series", + in: [][]rawSeries{{{ + lset: labels.FromStrings("a", "a"), + chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}}, + }}, {{ + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}}, + }}}, + + expected: []rawSeries{ + { + lset: labels.FromStrings("a", "a"), + chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}}, + }, { + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}}, + }, + }, + }, + { + desc: "two seriesSets, {a=c} series to merge", + in: [][]rawSeries{ + { + { + lset: labels.FromStrings("a", "a"), + chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}}, + }, { + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}}, + }, + }, { + { + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{7, 1}, {8, 2}}, {{9, 3}, {10, 4}, {11, 4444}}}, // Last sample overlaps, merge ignores that. + }, + }, + }, + + expected: []rawSeries{ + { + lset: labels.FromStrings("a", "a"), + chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}}, + }, { + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}, {{7, 1}, {8, 2}}, {{9, 3}, {10, 4}, {11, 4444}}}, + }, + }, + }, + { + // SeriesSet can return same series within different iterations. MergeSeries should not try to merge those. + // We do it on last step possible: Qurier promSet. + desc: "single seriesSets, {a=c} series to merge.", + in: [][]rawSeries{ + { + { + lset: labels.FromStrings("a", "a"), + chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}}, + }, { + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{7, 1}, {8, 2}}, {{9, 3}, {10, 4}, {11, 4444}}}, + }, { + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}}, + }, + }, + }, + + expected: []rawSeries{ + { + lset: labels.FromStrings("a", "a"), + chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}}, + }, { + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{7, 1}, {8, 2}}, {{9, 3}, {10, 4}, {11, 4444}}}, + }, { + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}}, + }, + }, + }, + } { + if ok := t.Run(tcase.desc, func(t *testing.T) { + var input []SeriesSet + for _, iss := range tcase.in { + input = append(input, newListSeriesSet(t, iss)) + } + ss := MergeSeriesSets(input...) + seriesEquals(t, tcase.expected, ss) + testutil.Ok(t, ss.Err()) + }); !ok { + return + } + } +} + +func TestMergeSeriesSetError(t *testing.T) { + var input []SeriesSet + for _, iss := range [][]rawSeries{{{ + lset: labels.FromStrings("a", "a"), + chunks: [][]sample{{{1, 1}, {2, 2}}, {{3, 3}, {4, 4}}}, + }}, {{ + lset: labels.FromStrings("a", "c"), + chunks: [][]sample{{{11, 1}, {12, 2}}, {{13, 3}, {14, 4}}}, + }}} { + input = append(input, newListSeriesSet(t, iss)) + } + expectedErr := errors.New("test error") + ss := MergeSeriesSets(append(input, errSeriesSet{err: expectedErr})...) + testutil.Equals(t, expectedErr, ss.Err()) +} + +type rawSeries struct { + lset labels.Labels + chunks [][]sample +} + +func seriesEquals(t *testing.T, expected []rawSeries, gotSS SeriesSet) { + var got []Series + for gotSS.Next() { + lset, chks := gotSS.At() + got = append(got, Series{Labels: lset, Chunks: chks}) + } + + testutil.Equals(t, len(expected), len(got), "got: %v", got) + + for i, series := range got { + testutil.Equals(t, expected[i].lset, LabelsToPromLabels(series.Labels)) + testutil.Equals(t, len(expected[i].chunks), len(series.Chunks), "unexpected number of chunks") + + for k, chk := range series.Chunks { + c, err := chunkenc.FromData(chunkenc.EncXOR, chk.Raw.Data) + testutil.Ok(t, err) + + j := 0 + iter := c.Iterator(nil) + for iter.Next() { + testutil.Assert(t, j < len(expected[i].chunks[k]), "more samples than expected for %s chunk %d", series.Labels, k) + + tv, v := iter.At() + testutil.Equals(t, expected[i].chunks[k][j], sample{tv, v}) + j++ + } + testutil.Ok(t, iter.Err()) + testutil.Equals(t, len(expected[i].chunks[k]), j) + } + } + +} diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index 229f81c2ba..3c46313e66 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -712,6 +712,11 @@ type StoreClient interface { /// available. Info(ctx context.Context, in *InfoRequest, opts ...grpc.CallOption) (*InfoResponse, error) /// Series streams each Series (Labels and chunk/downsampling chunk) for given label matchers and time range. + /// + /// Series should strictly stream full series after series, optionally split by time. This means that a single frame can contain + /// partition of the single series, but once a new series is started to be streamed it means that no more data will + /// be sent for previous one. + /// Series has to be sorted. Series(ctx context.Context, in *SeriesRequest, opts ...grpc.CallOption) (Store_SeriesClient, error) /// LabelNames returns all label names that is available. /// Currently unimplemented in all Thanos implementations, because Query API does not implement this either. @@ -793,6 +798,11 @@ type StoreServer interface { /// available. Info(context.Context, *InfoRequest) (*InfoResponse, error) /// Series streams each Series (Labels and chunk/downsampling chunk) for given label matchers and time range. + /// + /// Series should strictly stream full series after series, optionally split by time. This means that a single frame can contain + /// partition of the single series, but once a new series is started to be streamed it means that no more data will + /// be sent for previous one. + /// Series has to be sorted. Series(*SeriesRequest, Store_SeriesServer) error /// LabelNames returns all label names that is available. /// Currently unimplemented in all Thanos implementations, because Query API does not implement this either. diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 35ea869454..8b2098e43c 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -18,6 +18,11 @@ service Store { rpc Info(InfoRequest) returns (InfoResponse); /// Series streams each Series (Labels and chunk/downsampling chunk) for given label matchers and time range. + /// + /// Series should strictly stream full series after series, optionally split by time. This means that a single frame can contain + /// partition of the single series, but once a new series is started to be streamed it means that no more data will + /// be sent for previous one. + /// Series has to be sorted. rpc Series(SeriesRequest) returns (stream SeriesResponse); /// LabelNames returns all label names that is available.