Skip to content

Commit

Permalink
Fixes instant queries in the frontend. (#4091)
Browse files Browse the repository at this point in the history
* Fixes instant queries in the frontend.

This is a set of fixes for instant queries:
- correctly propagate statistics.
- correctly convert sampleStream to vector.
- correctly marshal to json.
- correctly encode the time for the request.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes tests.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Aug 3, 2021
1 parent c14172d commit 1efeace
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 41 deletions.
1 change: 1 addition & 0 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req
"query": []string{request.Query},
"direction": []string{request.Direction.String()},
"limit": []string{fmt.Sprintf("%d", request.Limit)},
"time": []string{fmt.Sprintf("%d", request.TimeTs.UnixNano())},
}
if len(request.Shards) > 0 {
params["shards"] = request.Shards
Expand Down
27 changes: 26 additions & 1 deletion pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
)
Expand Down Expand Up @@ -162,6 +163,25 @@ func sampleStreamToMatrix(streams []queryrange.SampleStream) parser.Value {
return xs
}

func sampleStreamToVector(streams []queryrange.SampleStream) parser.Value {
xs := make(promql.Vector, 0, len(streams))
for _, stream := range streams {
x := promql.Sample{}
x.Metric = make(labels.Labels, 0, len(stream.Labels))
for _, l := range stream.Labels {
x.Metric = append(x.Metric, labels.Label(l))
}

x.Point = promql.Point{
T: stream.Samples[0].TimestampMs,
V: stream.Samples[0].Value,
}

xs = append(xs, x)
}
return xs
}

func ResponseToResult(resp queryrange.Response) (logqlmodel.Result, error) {
switch r := resp.(type) {
case *LokiResponse:
Expand All @@ -184,7 +204,12 @@ func ResponseToResult(resp queryrange.Response) (logqlmodel.Result, error) {
if r.Response.Error != "" {
return logqlmodel.Result{}, fmt.Errorf("%s: %s", r.Response.ErrorType, r.Response.Error)
}

if r.Response.Data.ResultType == loghttp.ResultTypeVector {
return logqlmodel.Result{
Statistics: r.Statistics,
Data: sampleStreamToVector(r.Response.Data.Result),
}, nil
}
return logqlmodel.Result{
Statistics: r.Statistics,
Data: sampleStreamToMatrix(r.Response.Data.Result),
Expand Down
90 changes: 71 additions & 19 deletions pkg/querier/queryrange/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logqlmodel/stats"
)

Expand Down Expand Up @@ -41,41 +43,91 @@ func (PrometheusExtractor) ResponseWithoutHeaders(resp queryrange.Response) quer
// encode encodes a Prometheus response and injects Loki stats.
func (p *LokiPromResponse) encode(ctx context.Context) (*http.Response, error) {
sp := opentracing.SpanFromContext(ctx)
var (
b []byte
err error
)
if p.Response.Data.ResultType == loghttp.ResultTypeVector {
b, err = p.marshalVector()
} else {
b, err = p.marshalMatrix()
}
if err != nil {
return nil, err
}

if sp != nil {
sp.LogFields(otlog.Int("bytes", len(b)))
}

resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
},
Body: ioutil.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
}
return &resp, nil
}

func (p *LokiPromResponse) marshalVector() ([]byte, error) {
vec := make(loghttp.Vector, len(p.Response.Data.Result))
for i, v := range p.Response.Data.Result {
lbs := make(model.LabelSet, len(v.Labels))
for _, v := range v.Labels {
lbs[model.LabelName(v.Name)] = model.LabelValue(v.Value)
}
vec[i] = model.Sample{
Metric: model.Metric(lbs),
Timestamp: model.Time(v.Samples[0].TimestampMs),
Value: model.SampleValue(v.Samples[0].Value),
}
}
return jsonStd.Marshal(struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result loghttp.Vector `json:"result"`
Statistics stats.Result `json:"stats,omitempty"`
} `json:"data,omitempty"`
ErrorType string `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
}{
Error: p.Response.Error,
Data: struct {
ResultType string `json:"resultType"`
Result loghttp.Vector `json:"result"`
Statistics stats.Result `json:"stats,omitempty"`
}{
ResultType: loghttp.ResultTypeVector,
Result: vec,
Statistics: p.Statistics,
},
ErrorType: p.Response.ErrorType,
Status: p.Response.Status,
})
}

func (p *LokiPromResponse) marshalMatrix() ([]byte, error) {
// embed response and add statistics.
b, err := jsonStd.Marshal(struct {
return jsonStd.Marshal(struct {
Status string `json:"status"`
Data struct {
queryrange.PrometheusData
Statistics stats.Result `json:"stats"`
Statistics stats.Result `json:"stats,omitempty"`
} `json:"data,omitempty"`
ErrorType string `json:"errorType,omitempty"`
Error string `json:"error,omitempty"`
}{
Error: p.Response.Error,
Data: struct {
queryrange.PrometheusData
Statistics stats.Result `json:"stats"`
Statistics stats.Result `json:"stats,omitempty"`
}{
PrometheusData: p.Response.Data,
Statistics: p.Statistics,
},
ErrorType: p.Response.ErrorType,
Status: p.Response.Status,
})
if err != nil {
return nil, err
}

if sp != nil {
sp.LogFields(otlog.Int("bytes", len(b)))
}

resp := http.Response{
Header: http.Header{
"Content-Type": []string{"application/json"},
},
Body: ioutil.NopCloser(bytes.NewBuffer(b)),
StatusCode: http.StatusOK,
}
return &resp, nil
}
159 changes: 159 additions & 0 deletions pkg/querier/queryrange/prometheus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package queryrange

import (
"context"
"io"
"testing"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/loghttp"
)

var emptyStats = `"stats": {
"summary": {
"bytesProcessedPerSecond": 0,
"linesProcessedPerSecond": 0,
"totalBytesProcessed": 0,
"totalLinesProcessed": 0,
"execTime": 0.0
},
"store": {
"totalChunksRef": 0,
"totalChunksDownloaded": 0,
"chunksDownloadTime": 0,
"headChunkBytes": 0,
"headChunkLines": 0,
"decompressedBytes": 0,
"decompressedLines": 0,
"compressedBytes": 0,
"totalDuplicates": 0
},
"ingester": {
"totalReached": 0,
"totalChunksMatched": 0,
"totalBatches": 0,
"totalLinesSent": 0,
"headChunkBytes": 0,
"headChunkLines": 0,
"decompressedBytes": 0,
"decompressedLines": 0,
"compressedBytes": 0,
"totalDuplicates": 0
}
}`

func Test_encodePromResponse(t *testing.T) {
for _, tt := range []struct {
name string
resp *LokiPromResponse
want string
}{
{
"matrix",
&LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: string(queryrange.StatusSuccess),
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeMatrix,
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{
{Name: "foo", Value: "bar"},
},
Samples: []cortexpb.Sample{
{Value: 1, TimestampMs: 1000},
{Value: 1, TimestampMs: 2000},
},
},
{
Labels: []cortexpb.LabelAdapter{
{Name: "foo", Value: "buzz"},
},
Samples: []cortexpb.Sample{
{Value: 4, TimestampMs: 1000},
{Value: 5, TimestampMs: 2000},
},
},
},
},
},
},
`{
"status": "success",
"data": {
"resultType": "matrix",
"result": [
{
"metric": {"foo": "bar"},
"values": [[1, "1"],[2, "1"]]
},
{
"metric": {"foo": "buzz"},
"values": [[1, "4"],[2, "5"]]
}
],
` + emptyStats + `
}
}`,
},
{
"vector",
&LokiPromResponse{
Response: &queryrange.PrometheusResponse{
Status: string(queryrange.StatusSuccess),
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{
{Name: "foo", Value: "bar"},
},
Samples: []cortexpb.Sample{
{Value: 1, TimestampMs: 1000},
},
},
{
Labels: []cortexpb.LabelAdapter{
{Name: "foo", Value: "buzz"},
},
Samples: []cortexpb.Sample{
{Value: 4, TimestampMs: 1000},
},
},
},
},
},
},
`{
"status": "success",
"data": {
"resultType": "vector",
"result": [
{
"metric": {"foo": "bar"},
"value": [1, "1"]
},
{
"metric": {"foo": "buzz"},
"value": [1, "4"]
}
],
` + emptyStats + `
}
}`,
},
} {
tt := tt
t.Run(tt.name, func(t *testing.T) {
r, err := tt.resp.encode(context.Background())
require.NoError(t, err)
b, err := io.ReadAll(r.Body)
require.NoError(t, err)
got := string(b)
require.JSONEq(t, tt.want, got)
})
}
}
1 change: 1 addition & 0 deletions pkg/querier/queryrange/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra
}, nil
case parser.ValueTypeVector:
return &LokiPromResponse{
Statistics: res.Statistics,
Response: &queryrange.PrometheusResponse{
Status: loghttp.QueryStatusSuccess,
Data: queryrange.PrometheusData{
Expand Down
34 changes: 16 additions & 18 deletions pkg/querier/queryrange/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,26 +283,24 @@ func Test_InstantSharding(t *testing.T) {
require.Equal(t, 3, called, "expected 3 calls but got {}", called)
require.Len(t, response.(*LokiPromResponse).Response.Data.Result, 3)
require.ElementsMatch(t, []string{"0_of_3", "1_of_3", "2_of_3"}, shards)
require.Equal(t, &LokiPromResponse{Response: &queryrange.PrometheusResponse{
Status: "success",
Data: queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
require.Equal(t, queryrange.PrometheusData{
ResultType: loghttp.ResultTypeVector,
Result: []queryrange.SampleStream{
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
{
Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}},
Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}},
},
},
}}, response)
}, response.(*LokiPromResponse).Response.Data)
require.Equal(t, loghttp.QueryStatusSuccess, response.(*LokiPromResponse).Response.Status)
}

func Test_SeriesShardingHandler(t *testing.T) {
Expand Down
Loading

0 comments on commit 1efeace

Please sign in to comment.