diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 107b0afb59a66..538611ec0d3fc 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -328,6 +328,7 @@ func (Codec) EncodeRequest(ctx context.Context, r queryrange.Request) (*http.Req "query": []string{request.Query}, "direction": []string{request.Direction.String()}, "limit": []string{fmt.Sprintf("%d", request.Limit)}, + "time": []string{fmt.Sprintf("%d", request.TimeTs.UnixNano())}, } if len(request.Shards) > 0 { params["shards"] = request.Shards diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index d532ef24961bb..31bb497e67a89 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" + "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logqlmodel" ) @@ -162,6 +163,25 @@ func sampleStreamToMatrix(streams []queryrange.SampleStream) parser.Value { return xs } +func sampleStreamToVector(streams []queryrange.SampleStream) parser.Value { + xs := make(promql.Vector, 0, len(streams)) + for _, stream := range streams { + x := promql.Sample{} + x.Metric = make(labels.Labels, 0, len(stream.Labels)) + for _, l := range stream.Labels { + x.Metric = append(x.Metric, labels.Label(l)) + } + + x.Point = promql.Point{ + T: stream.Samples[0].TimestampMs, + V: stream.Samples[0].Value, + } + + xs = append(xs, x) + } + return xs +} + func ResponseToResult(resp queryrange.Response) (logqlmodel.Result, error) { switch r := resp.(type) { case *LokiResponse: @@ -184,7 +204,12 @@ func ResponseToResult(resp queryrange.Response) (logqlmodel.Result, error) { if r.Response.Error != "" { return logqlmodel.Result{}, fmt.Errorf("%s: %s", r.Response.ErrorType, r.Response.Error) } - + if r.Response.Data.ResultType == loghttp.ResultTypeVector { + return logqlmodel.Result{ + Statistics: r.Statistics, + Data: sampleStreamToVector(r.Response.Data.Result), + }, nil + } return logqlmodel.Result{ Statistics: r.Statistics, Data: sampleStreamToMatrix(r.Response.Data.Result), diff --git a/pkg/querier/queryrange/prometheus.go b/pkg/querier/queryrange/prometheus.go index 7819d6df360a6..821148f966960 100644 --- a/pkg/querier/queryrange/prometheus.go +++ b/pkg/querier/queryrange/prometheus.go @@ -10,7 +10,9 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/common/model" + "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logqlmodel/stats" ) @@ -41,12 +43,78 @@ func (PrometheusExtractor) ResponseWithoutHeaders(resp queryrange.Response) quer // encode encodes a Prometheus response and injects Loki stats. func (p *LokiPromResponse) encode(ctx context.Context) (*http.Response, error) { sp := opentracing.SpanFromContext(ctx) + var ( + b []byte + err error + ) + if p.Response.Data.ResultType == loghttp.ResultTypeVector { + b, err = p.marshalVector() + } else { + b, err = p.marshalMatrix() + } + if err != nil { + return nil, err + } + + if sp != nil { + sp.LogFields(otlog.Int("bytes", len(b))) + } + + resp := http.Response{ + Header: http.Header{ + "Content-Type": []string{"application/json"}, + }, + Body: ioutil.NopCloser(bytes.NewBuffer(b)), + StatusCode: http.StatusOK, + } + return &resp, nil +} + +func (p *LokiPromResponse) marshalVector() ([]byte, error) { + vec := make(loghttp.Vector, len(p.Response.Data.Result)) + for i, v := range p.Response.Data.Result { + lbs := make(model.LabelSet, len(v.Labels)) + for _, v := range v.Labels { + lbs[model.LabelName(v.Name)] = model.LabelValue(v.Value) + } + vec[i] = model.Sample{ + Metric: model.Metric(lbs), + Timestamp: model.Time(v.Samples[0].TimestampMs), + Value: model.SampleValue(v.Samples[0].Value), + } + } + return jsonStd.Marshal(struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + Result loghttp.Vector `json:"result"` + Statistics stats.Result `json:"stats,omitempty"` + } `json:"data,omitempty"` + ErrorType string `json:"errorType,omitempty"` + Error string `json:"error,omitempty"` + }{ + Error: p.Response.Error, + Data: struct { + ResultType string `json:"resultType"` + Result loghttp.Vector `json:"result"` + Statistics stats.Result `json:"stats,omitempty"` + }{ + ResultType: loghttp.ResultTypeVector, + Result: vec, + Statistics: p.Statistics, + }, + ErrorType: p.Response.ErrorType, + Status: p.Response.Status, + }) +} + +func (p *LokiPromResponse) marshalMatrix() ([]byte, error) { // embed response and add statistics. - b, err := jsonStd.Marshal(struct { + return jsonStd.Marshal(struct { Status string `json:"status"` Data struct { queryrange.PrometheusData - Statistics stats.Result `json:"stats"` + Statistics stats.Result `json:"stats,omitempty"` } `json:"data,omitempty"` ErrorType string `json:"errorType,omitempty"` Error string `json:"error,omitempty"` @@ -54,7 +122,7 @@ func (p *LokiPromResponse) encode(ctx context.Context) (*http.Response, error) { Error: p.Response.Error, Data: struct { queryrange.PrometheusData - Statistics stats.Result `json:"stats"` + Statistics stats.Result `json:"stats,omitempty"` }{ PrometheusData: p.Response.Data, Statistics: p.Statistics, @@ -62,20 +130,4 @@ func (p *LokiPromResponse) encode(ctx context.Context) (*http.Response, error) { ErrorType: p.Response.ErrorType, Status: p.Response.Status, }) - if err != nil { - return nil, err - } - - if sp != nil { - sp.LogFields(otlog.Int("bytes", len(b))) - } - - resp := http.Response{ - Header: http.Header{ - "Content-Type": []string{"application/json"}, - }, - Body: ioutil.NopCloser(bytes.NewBuffer(b)), - StatusCode: http.StatusOK, - } - return &resp, nil } diff --git a/pkg/querier/queryrange/prometheus_test.go b/pkg/querier/queryrange/prometheus_test.go new file mode 100644 index 0000000000000..38db42b8669a4 --- /dev/null +++ b/pkg/querier/queryrange/prometheus_test.go @@ -0,0 +1,159 @@ +package queryrange + +import ( + "context" + "io" + "testing" + + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/loghttp" +) + +var emptyStats = `"stats": { + "summary": { + "bytesProcessedPerSecond": 0, + "linesProcessedPerSecond": 0, + "totalBytesProcessed": 0, + "totalLinesProcessed": 0, + "execTime": 0.0 + }, + "store": { + "totalChunksRef": 0, + "totalChunksDownloaded": 0, + "chunksDownloadTime": 0, + "headChunkBytes": 0, + "headChunkLines": 0, + "decompressedBytes": 0, + "decompressedLines": 0, + "compressedBytes": 0, + "totalDuplicates": 0 + }, + "ingester": { + "totalReached": 0, + "totalChunksMatched": 0, + "totalBatches": 0, + "totalLinesSent": 0, + "headChunkBytes": 0, + "headChunkLines": 0, + "decompressedBytes": 0, + "decompressedLines": 0, + "compressedBytes": 0, + "totalDuplicates": 0 + } +}` + +func Test_encodePromResponse(t *testing.T) { + for _, tt := range []struct { + name string + resp *LokiPromResponse + want string + }{ + { + "matrix", + &LokiPromResponse{ + Response: &queryrange.PrometheusResponse{ + Status: string(queryrange.StatusSuccess), + Data: queryrange.PrometheusData{ + ResultType: loghttp.ResultTypeMatrix, + Result: []queryrange.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + {Value: 1, TimestampMs: 2000}, + }, + }, + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "buzz"}, + }, + Samples: []cortexpb.Sample{ + {Value: 4, TimestampMs: 1000}, + {Value: 5, TimestampMs: 2000}, + }, + }, + }, + }, + }, + }, + `{ + "status": "success", + "data": { + "resultType": "matrix", + "result": [ + { + "metric": {"foo": "bar"}, + "values": [[1, "1"],[2, "1"]] + }, + { + "metric": {"foo": "buzz"}, + "values": [[1, "4"],[2, "5"]] + } + ], + ` + emptyStats + ` + } + }`, + }, + { + "vector", + &LokiPromResponse{ + Response: &queryrange.PrometheusResponse{ + Status: string(queryrange.StatusSuccess), + Data: queryrange.PrometheusData{ + ResultType: loghttp.ResultTypeVector, + Result: []queryrange.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "bar"}, + }, + Samples: []cortexpb.Sample{ + {Value: 1, TimestampMs: 1000}, + }, + }, + { + Labels: []cortexpb.LabelAdapter{ + {Name: "foo", Value: "buzz"}, + }, + Samples: []cortexpb.Sample{ + {Value: 4, TimestampMs: 1000}, + }, + }, + }, + }, + }, + }, + `{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + { + "metric": {"foo": "bar"}, + "value": [1, "1"] + }, + { + "metric": {"foo": "buzz"}, + "value": [1, "4"] + } + ], + ` + emptyStats + ` + } + }`, + }, + } { + tt := tt + t.Run(tt.name, func(t *testing.T) { + r, err := tt.resp.encode(context.Background()) + require.NoError(t, err) + b, err := io.ReadAll(r.Body) + require.NoError(t, err) + got := string(b) + require.JSONEq(t, tt.want, got) + }) + } +} diff --git a/pkg/querier/queryrange/querysharding.go b/pkg/querier/queryrange/querysharding.go index 62db7be693f76..f3baa8c48ce50 100644 --- a/pkg/querier/queryrange/querysharding.go +++ b/pkg/querier/queryrange/querysharding.go @@ -164,13 +164,15 @@ func (ast *astMapperware) Do(ctx context.Context, r queryrange.Request) (queryra }, }, nil case parser.ValueTypeVector: - return &LokiPromResponse{Response: &queryrange.PrometheusResponse{ - Status: loghttp.QueryStatusSuccess, - Data: queryrange.PrometheusData{ - ResultType: loghttp.ResultTypeVector, - Result: toProtoVector(value.(loghttp.Vector)), + return &LokiPromResponse{ + Statistics: res.Statistics, + Response: &queryrange.PrometheusResponse{ + Status: loghttp.QueryStatusSuccess, + Data: queryrange.PrometheusData{ + ResultType: loghttp.ResultTypeVector, + Result: toProtoVector(value.(loghttp.Vector)), + }, }, - }, }, nil default: return nil, fmt.Errorf("unexpected downstream response type (%T)", res.Data.Type()) diff --git a/pkg/querier/queryrange/querysharding_test.go b/pkg/querier/queryrange/querysharding_test.go index 7980e02f610b6..71cabf804cc1e 100644 --- a/pkg/querier/queryrange/querysharding_test.go +++ b/pkg/querier/queryrange/querysharding_test.go @@ -283,26 +283,24 @@ func Test_InstantSharding(t *testing.T) { require.Equal(t, 3, called, "expected 3 calls but got {}", called) require.Len(t, response.(*LokiPromResponse).Response.Data.Result, 3) require.ElementsMatch(t, []string{"0_of_3", "1_of_3", "2_of_3"}, shards) - require.Equal(t, &LokiPromResponse{Response: &queryrange.PrometheusResponse{ - Status: "success", - Data: queryrange.PrometheusData{ - ResultType: loghttp.ResultTypeVector, - Result: []queryrange.SampleStream{ - { - Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}}, - Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}}, - }, - { - Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}}, - Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}}, - }, - { - Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}}, - Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}}, - }, + require.Equal(t, queryrange.PrometheusData{ + ResultType: loghttp.ResultTypeVector, + Result: []queryrange.SampleStream{ + { + Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}}, + Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}}, + }, + { + Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}}, + Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}}, + }, + { + Labels: []cortexpb.LabelAdapter{{Name: "foo", Value: "bar"}}, + Samples: []cortexpb.Sample{{Value: 10, TimestampMs: 10}}, }, }, - }}, response) + }, response.(*LokiPromResponse).Response.Data) + require.Equal(t, loghttp.QueryStatusSuccess, response.(*LokiPromResponse).Response.Status) } func Test_SeriesShardingHandler(t *testing.T) { diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index cda6cc3273430..3df6422d7dcf6 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -110,10 +110,11 @@ func StatsCollectorMiddleware() queryrange.Middleware { data.recorded = true data.statistics = statistics data.result = res - data.params, err = paramsFromRequest(req) - if err != nil { - return nil, err + p, errReq := paramsFromRequest(req) + if errReq != nil { + return nil, errReq } + data.params = p } return resp, err })