Skip to content

Commit

Permalink
Support series query in query-frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
adityacs committed Jun 3, 2020
1 parent 8f9e456 commit bb5fcf7
Show file tree
Hide file tree
Showing 8 changed files with 1,449 additions and 229 deletions.
391 changes: 263 additions & 128 deletions pkg/querier/queryrange/codec.go

Large diffs are not rendered by default.

111 changes: 111 additions & 0 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ func Test_codec_DecodeRequest(t *testing.T) {
StartTs: start,
EndTs: end,
}, false},
{"series", func() (*http.Request, error) {
return http.NewRequest(http.MethodGet,
fmt.Sprintf(`/series?start=%d&end=%d&match={foo="bar"}`, start.UnixNano(), end.UnixNano()), nil)
}, &LokiSeriesRequest{
Match: []string{`{foo="bar"}`},
Path: "/series",
StartTs: start,
EndTs: end,
}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -115,6 +124,13 @@ func Test_codec_DecodeResponse(t *testing.T) {
},
Statistics: statsResult,
}, false},
{"series", &http.Response{StatusCode: 200, Body: ioutil.NopCloser(strings.NewReader(seriesString))},
&LokiSeriesRequest{Path: "/loki/api/v1/series"},
&LokiSeriesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: seriesData,
}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -168,6 +184,35 @@ func Test_codec_EncodeRequest(t *testing.T) {
require.Equal(t, "/loki/api/v1/query_range", req.(*LokiRequest).Path)
}

func Test_codec_series_EncodeRequest(t *testing.T) {
got, err := lokiCodec.EncodeRequest(context.TODO(), &queryrange.PrometheusRequest{})
require.Error(t, err)
require.Nil(t, got)

ctx := context.Background()
toEncode := &LokiSeriesRequest{
Match: []string{`{foo="bar"}`},
Path: "/series",
StartTs: start,
EndTs: end,
}
got, err = lokiCodec.EncodeRequest(ctx, toEncode)
require.NoError(t, err)
require.Equal(t, ctx, got.Context())
require.Equal(t, "/loki/api/v1/series", got.URL.Path)
require.Equal(t, fmt.Sprintf("%d", start.UnixNano()), got.URL.Query().Get("start"))
require.Equal(t, fmt.Sprintf("%d", end.UnixNano()), got.URL.Query().Get("end"))
require.Equal(t, `{foo="bar"}`, got.URL.Query().Get("match[]"))

// testing a full roundtrip
req, err := lokiCodec.DecodeRequest(context.TODO(), got)
require.NoError(t, err)
require.Equal(t, toEncode.Match, req.(*LokiSeriesRequest).Match)
require.Equal(t, toEncode.StartTs, req.(*LokiSeriesRequest).StartTs)
require.Equal(t, toEncode.EndTs, req.(*LokiSeriesRequest).EndTs)
require.Equal(t, "/loki/api/v1/series", req.(*LokiSeriesRequest).Path)
}

func Test_codec_EncodeResponse(t *testing.T) {

tests := []struct {
Expand Down Expand Up @@ -211,6 +256,12 @@ func Test_codec_EncodeResponse(t *testing.T) {
},
Statistics: statsResult,
}, streamsStringLegacy, false},
{"loki series",
&LokiSeriesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: seriesData,
}, seriesString, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -604,6 +655,51 @@ func Test_codec_MergeResponse(t *testing.T) {
},
false,
},
{
"loki series",
[]queryrange.Response{
&LokiSeriesResponse{
Status: "success",
Version: 1,
Data: []logproto.SeriesIdentifier{
{
Labels: map[string]string{"filename": "/var/hostlog/apport.log", "job": "varlogs"},
},
{
Labels: map[string]string{"filename": "/var/hostlog/test.log", "job": "varlogs"},
},
},
},
&LokiSeriesResponse{
Status: "success",
Version: 1,
Data: []logproto.SeriesIdentifier{
{
Labels: map[string]string{"filename": "/var/hostlog/apport.log", "job": "varlogs"},
},
{
Labels: map[string]string{"filename": "/var/hostlog/other.log", "job": "varlogs"},
},
},
},
},
&LokiSeriesResponse{
Status: "success",
Version: 1,
Data: []logproto.SeriesIdentifier{
{
Labels: map[string]string{"filename": "/var/hostlog/apport.log", "job": "varlogs"},
},
{
Labels: map[string]string{"filename": "/var/hostlog/test.log", "job": "varlogs"},
},
{
Labels: map[string]string{"filename": "/var/hostlog/other.log", "job": "varlogs"},
},
},
},
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -756,6 +852,21 @@ var (
},
},
}
seriesString = `{
"status": "success",
"data": [
{"filename": "/var/hostlog/apport.log", "job": "varlogs"},
{"filename": "/var/hostlog/test.log", "job": "varlogs"}
]
}`
seriesData = []logproto.SeriesIdentifier{
{
Labels: map[string]string{"filename": "/var/hostlog/apport.log", "job": "varlogs"},
},
{
Labels: map[string]string{"filename": "/var/hostlog/test.log", "job": "varlogs"},
},
}
statsResult = stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 20,
Expand Down
Loading

0 comments on commit bb5fcf7

Please sign in to comment.