Skip to content

Commit

Permalink
sidecar: Added support for streaming, chunked remote read.
Browse files Browse the repository at this point in the history
Fixes: #488

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Aug 27, 2019
1 parent af53b38 commit d7b4aef
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 52 deletions.
206 changes: 162 additions & 44 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ import (
"net/url"
"path"
"sort"
"strings"
"sync"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/labels"
"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -80,7 +83,7 @@ func NewPrometheusStore(
}

// Info returns store information about the Prometheus instance.
// NOTE(bplotka): MaxTime & MinTime are not accurate nor adjusted dynamically.
// NOTE(bwplotka): MaxTime & MinTime are not accurate nor adjusted dynamically.
// This is fine for now, but might be needed in future.
func (p *PrometheusStore) Info(ctx context.Context, r *storepb.InfoRequest) (*storepb.InfoResponse, error) {
lset := p.externalLabels()
Expand Down Expand Up @@ -142,8 +145,6 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie

q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime}

// TODO(fabxc): import common definitions from prompb once we have a stable gRPC
// query API there.
for _, m := range newMatchers {
pm := &prompb.LabelMatcher{Name: m.Name, Value: m.Value}

Expand All @@ -162,12 +163,39 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
q.Matchers = append(q.Matchers, pm)
}

resp, err := p.promSeries(s.Context(), q)
queryPrometheusSpan, ctx := tracing.StartSpan(s.Context(), "query_prometheus")

httpResp, err := p.startPromSeries(ctx, q)
if err != nil {
queryPrometheusSpan.Finish()
return errors.Wrap(err, "query Prometheus")
}

span, _ := tracing.StartSpan(s.Context(), "transform_and_respond")
// Negotiate content. We requested streamed chunked response type, but still we need to support old versions of
// remote read.
contentType := httpResp.Header.Get("Content-Type")
if strings.HasPrefix(contentType, "application/x-protobuf") {
return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, externalLabels)
}

if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") {
return errors.Errorf("not supported remote read content type: %s", contentType)
}
return p.handleStreamedPrometheusResponse(s, httpResp, queryPrometheusSpan, externalLabels)
}

func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan opentracing.Span, externalLabels labels.Labels) error {
ctx := s.Context()

level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.")

resp, err := p.fetchSampledResponse(ctx, httpResp)
querySpan.Finish()
if err != nil {
return err
}

span, _ := tracing.StartSpan(ctx, "transform_and_respond")
defer span.Finish()
span.SetTag("series_count", len(resp.Results[0].Timeseries))

Expand Down Expand Up @@ -195,17 +223,129 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie
return err
}

resp := storepb.NewSeriesResponse(&storepb.Series{
if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{
Labels: lset,
Chunks: aggregatedChunks,
})
if err := s.Send(resp); err != nil {
})); err != nil {
return err
}
}
level.Debug(p.logger).Log("msg", "handled ReadRequest_SAMPLED request.", "series", len(resp.Results[0].Timeseries))
return nil
}

func (p *PrometheusStore) handleStreamedPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan opentracing.Span, externalLabels labels.Labels) error {
level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.")

framesNum := 0
seriesNum := 0

defer func() {
querySpan.SetTag("frames", framesNum)
querySpan.SetTag("series", seriesNum)
querySpan.Finish()
}()
defer runutil.CloseWithLogOnErr(p.logger, httpResp.Body, "prom series request body")

var (
lastSeries string
currSeries string
tmp []string
data = p.getBuffer()
)
defer p.putBuffer(data)

// TODO(bwplotka): Put read limit as a flag.
stream := remote.NewChunkedReader(httpResp.Body, remote.DefaultChunkedReadLimit, *data)
for {
res := &prompb.ChunkedReadResponse{}
err := stream.NextProto(res)
if err == io.EOF {
break
}
if err != nil {
return errors.Wrap(err, "next proto")
}

if len(res.ChunkedSeries) != 1 {
level.Warn(p.logger).Log("msg", "Prometheus ReadRequest_STREAMED_XOR_CHUNKS returned non 1 series in frame", "series", len(res.ChunkedSeries))
}

framesNum++
for _, series := range res.ChunkedSeries {
{
// Calculate hash of series for counting.
tmp = tmp[:0]
for _, l := range series.Labels {
tmp = append(tmp, l.String())
}
currSeries = strings.Join(tmp, ";")
if currSeries != lastSeries {
seriesNum++
lastSeries = currSeries
}
}

thanosChks := make([]storepb.AggrChunk, len(series.Chunks))
for i, chk := range series.Chunks {
thanosChks[i] = storepb.AggrChunk{
MaxTime: chk.MaxTimeMs,
MinTime: chk.MinTimeMs,
Raw: &storepb.Chunk{
Data: chk.Data,
// Prometheus ChunkEncoding vs ours https://github.com/thanos-io/thanos/blob/master/pkg/store/storepb/types.proto#L19
// has one difference. Prometheus has Chunk_UNKNOWN Chunk_Encoding = 0 vs we start from
// XOR as 0. Compensate for that here:
Type: storepb.Chunk_Encoding(chk.Type - 1),
},
}
// Drop the reference to data from non protobuf for GC.
series.Chunks[i].Data = nil
}

if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{
Labels: p.translateAndExtendLabels(series.Labels, externalLabels),
Chunks: thanosChks,
})); err != nil {
return err
}
}
}
level.Debug(p.logger).Log("msg", "handled ReadRequest_STREAMED_XOR_CHUNKS request.", "frames", framesNum, "series", seriesNum)
return nil
}

func (p *PrometheusStore) fetchSampledResponse(ctx context.Context, resp *http.Response) (*prompb.ReadResponse, error) {
defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "prom series request body")

b := p.getBuffer()
buf := bytes.NewBuffer(*b)
defer p.putBuffer(b)
if _, err := io.Copy(buf, resp.Body); err != nil {
return nil, errors.Wrap(err, "copy response")
}
spanSnappyDecode, ctx := tracing.StartSpan(ctx, "decompress_response")
sb := p.getBuffer()
decomp, err := snappy.Decode(*sb, buf.Bytes())
spanSnappyDecode.Finish()
defer p.putBuffer(sb)
if err != nil {
return nil, errors.Wrap(err, "decompress response")
}

var data prompb.ReadResponse
spanUnmarshal, _ := tracing.StartSpan(ctx, "unmarshal_response")
if err := proto.Unmarshal(decomp, &data); err != nil {
return nil, errors.Wrap(err, "unmarshal response")
}
spanUnmarshal.Finish()
if len(data.Results) != 1 {
return nil, errors.Errorf("unexpected result size %d", len(data.Results))
}

return &data, nil
}

func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerChunk int) (chks []storepb.AggrChunk, err error) {
samples := series.Samples

Expand All @@ -232,11 +372,11 @@ func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerC
return chks, nil
}

func (p *PrometheusStore) promSeries(ctx context.Context, q *prompb.Query) (*prompb.ReadResponse, error) {
span, ctx := tracing.StartSpan(ctx, "query_prometheus")
defer span.Finish()

reqb, err := proto.Marshal(&prompb.ReadRequest{Queries: []*prompb.Query{q}})
func (p *PrometheusStore) startPromSeries(ctx context.Context, q *prompb.Query) (*http.Response, error) {
reqb, err := proto.Marshal(&prompb.ReadRequest{
Queries: []*prompb.Query{q},
AcceptedResponseTypes: []prompb.ReadRequest_ResponseType{prompb.ReadRequest_STREAMED_XOR_CHUNKS},
})
if err != nil {
return nil, errors.Wrap(err, "marshal read request")
}
Expand All @@ -249,47 +389,25 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q *prompb.Query) (*pro
return nil, errors.Wrap(err, "unable to create request")
}
preq.Header.Add("Content-Encoding", "snappy")
preq.Header.Set("Content-Type", "application/x-protobuf")
preq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0")
preq.Header.Set("Content-Type", "application/x-stream-protobuf")
spanReqDo, ctx := tracing.StartSpan(ctx, "query_prometheus_request")
preq = preq.WithContext(ctx)
presp, err := p.client.Do(preq)
if err != nil {
return nil, errors.Wrap(err, "send request")
}
spanReqDo.Finish()
defer runutil.ExhaustCloseWithLogOnErr(p.logger, presp.Body, "prom series request body")

if presp.StatusCode/100 != 2 {
return nil, errors.Errorf("request failed with code %s", presp.Status)
}

c := p.getBuffer()
buf := bytes.NewBuffer(*c)
defer p.putBuffer(c)
if _, err := io.Copy(buf, presp.Body); err != nil {
return nil, errors.Wrap(err, "copy response")
}

spanSnappyDecode, ctx := tracing.StartSpan(ctx, "decompress_response")
sc := p.getBuffer()
decomp, err := snappy.Decode(*sc, buf.Bytes())
spanSnappyDecode.Finish()
defer p.putBuffer(sc)
if err != nil {
return nil, errors.Wrap(err, "decompress response")
// Best effort read.
b, err := ioutil.ReadAll(presp.Body)
if err != nil {
level.Error(p.logger).Log("msg", "failed to read response from non 2XX remote read request", "err", err)
}
_ = presp.Body.Close()
return nil, errors.Errorf("request failed with code %s; msg %s", presp.Status, string(b))
}

var data prompb.ReadResponse
spanUnmarshal, _ := tracing.StartSpan(ctx, "unmarshal_response")
if err := proto.Unmarshal(decomp, &data); err != nil {
return nil, errors.Wrap(err, "unmarshal response")
}
spanUnmarshal.Finish()
if len(data.Results) != 1 {
return nil, errors.Errorf("unexpected result size %d", len(data.Results))
}
return &data, nil
return presp, nil
}

// matchesExternalLabels filters out external labels matching from matcher if exsits as the local storage does not have them.
Expand Down
14 changes: 6 additions & 8 deletions pkg/store/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {

p, err := testutil.NewPrometheusOnPath(prefix)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, p.Stop()) }()

baseT := timestamp.FromTime(time.Now()) / 1000 * 1000

Expand All @@ -50,7 +51,6 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
defer cancel()

testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)
Expand All @@ -65,15 +65,13 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) {
// Query all three samples except for the first one. Since we round up queried data
// to seconds, we can test whether the extra sample gets stripped properly.
srv := newStoreSeriesServer(ctx)

err = proxy.Series(&storepb.SeriesRequest{
testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{
MinTime: baseT + 101,
MaxTime: baseT + 300,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"},
},
}, srv)
testutil.Ok(t, err)
}, srv))

testutil.Equals(t, 1, len(srv.SeriesSet))

Expand Down Expand Up @@ -134,6 +132,7 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {

p, err := testutil.NewPrometheus()
testutil.Ok(t, err)
defer func() { testutil.Ok(t, p.Stop()) }()

a := p.Appender()
_, err = a.Add(labels.FromStrings("a", "b"), 0, 1)
Expand All @@ -148,7 +147,6 @@ func TestPrometheusStore_LabelValues_e2e(t *testing.T) {
defer cancel()

testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)
Expand All @@ -170,6 +168,7 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) {

p, err := testutil.NewPrometheus()
testutil.Ok(t, err)
defer func() { testutil.Ok(t, p.Stop()) }()

a := p.Appender()
_, err = a.Add(labels.FromStrings("ext_a", "b"), 0, 1)
Expand All @@ -182,7 +181,6 @@ func TestPrometheusStore_ExternalLabelValues_e2e(t *testing.T) {
defer cancel()

testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)
Expand Down Expand Up @@ -210,6 +208,7 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) {

p, err := testutil.NewPrometheus()
testutil.Ok(t, err)
defer func() { testutil.Ok(t, p.Stop()) }()

baseT := timestamp.FromTime(time.Now()) / 1000 * 1000

Expand All @@ -226,7 +225,6 @@ func TestPrometheusStore_Series_MatchExternalLabel_e2e(t *testing.T) {
defer cancel()

testutil.Ok(t, p.Start())
defer func() { testutil.Ok(t, p.Stop()) }()

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/store/storepb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/store/storepb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ service Store {
rpc Info(InfoRequest) returns (InfoResponse);

/// Series streams each Series (Labels and chunk/downsampling chunk) for given label matchers and time range.
///
/// Series should strictly stream full series after series, optionally split by time. This means that a single frame can contain
//// partition of the single series, but once a new series is started to be streamed it means that no more data will
//// be sent for previous one.
rpc Series(SeriesRequest) returns (stream SeriesResponse);

/// LabelNames returns all label names that is available.
Expand Down

0 comments on commit d7b4aef

Please sign in to comment.