Skip to content

Commit

Permalink
Loki: Series API will return all series with no match or empty matcher (
Browse files Browse the repository at this point in the history
#2254)

* Allow series queries with no match or an empty {} matcher which will return all series for the provided timeframe.

* fix test

* fix lint
  • Loading branch information
slim-bean authored Jun 26, 2020
1 parent 48b8504 commit aee064d
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 43 deletions.
22 changes: 18 additions & 4 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,26 @@ func TestIngester(t *testing.T) {

// Series

// empty matchers
_, err = i.Series(ctx, &logproto.SeriesRequest{
// empty matcher return all series
resp, err := i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
})
require.Error(t, err)
require.Nil(t, err)
require.ElementsMatch(t, []logproto.SeriesIdentifier{
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz1",
},
},
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz2",
},
},
}, resp.GetSeries())

// wrong matchers fmt
_, err = i.Series(ctx, &logproto.SeriesRequest{
Expand All @@ -129,7 +143,7 @@ func TestIngester(t *testing.T) {
require.Error(t, err)

// foo=bar
resp, err := i.Series(ctx, &logproto.SeriesRequest{
resp, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{foo="bar"}`},
Expand Down
61 changes: 46 additions & 15 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,33 +249,64 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp
return nil, err
}

dedupedSeries := make(map[uint64]logproto.SeriesIdentifier)
for _, matchers := range groups {
err = i.forMatchingStreams(matchers, func(stream *stream) error {
// exit early when this stream was added by an earlier group
key := stream.labels.Hash()
if _, found := dedupedSeries[key]; found {
return nil
}
var series []logproto.SeriesIdentifier

dedupedSeries[key] = logproto.SeriesIdentifier{
// If no matchers were supplied we include all streams.
if len(groups) == 0 {
series = make([]logproto.SeriesIdentifier, 0, len(i.streams))
err = i.forAllStreams(func(stream *stream) error {
series = append(series, logproto.SeriesIdentifier{
Labels: stream.labels.Map(),
}
})
return nil
})

if err != nil {
return nil, err
}
}
series := make([]logproto.SeriesIdentifier, 0, len(dedupedSeries))
for _, v := range dedupedSeries {
series = append(series, v)
} else {
dedupedSeries := make(map[uint64]logproto.SeriesIdentifier)
for _, matchers := range groups {
err = i.forMatchingStreams(matchers, func(stream *stream) error {
// exit early when this stream was added by an earlier group
key := stream.labels.Hash()
if _, found := dedupedSeries[key]; found {
return nil
}

dedupedSeries[key] = logproto.SeriesIdentifier{
Labels: stream.labels.Map(),
}
return nil
})
if err != nil {
return nil, err
}
}
series = make([]logproto.SeriesIdentifier, 0, len(dedupedSeries))
for _, v := range dedupedSeries {
series = append(series, v)

}
}

return &logproto.SeriesResponse{Series: series}, nil
}

// forAllStreams will execute a function for all streams in the instance.
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
func (i *instance) forAllStreams(fn func(*stream) error) error {
i.streamsMtx.RLock()
defer i.streamsMtx.RUnlock()

for _, stream := range i.streams {
err := fn(stream)
if err != nil {
return err
}
}
return nil
}

// forMatchingStreams will execute a function for each stream that satisfies a set of requirements (time range, matchers, etc).
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
func (i *instance) forMatchingStreams(
Expand Down
4 changes: 0 additions & 4 deletions pkg/loghttp/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ func interval(r *http.Request) (time.Duration, error) {

// Match extracts and parses multiple matcher groups from a slice of strings
func Match(xs []string) ([][]*labels.Matcher, error) {
if len(xs) == 0 {
return nil, errors.New("0 matcher groups supplied")
}

groups := make([][]*labels.Matcher, 0, len(xs))
for _, x := range xs {
ms, err := logql.ParseMatchers(x)
Expand Down
3 changes: 2 additions & 1 deletion pkg/loghttp/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ func Test_match(t *testing.T) {
wantErr bool
}{
{"malformed", []string{`{a="1`}, nil, true},
{"errors on nil input", nil, nil, true},
{"empty on nil input", nil, [][]*labels.Matcher{}, false},
{"empty on empty input", []string{}, [][]*labels.Matcher{}, false},
{
"single",
[]string{`{a="1"}`},
Expand Down
12 changes: 12 additions & 0 deletions pkg/loghttp/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package loghttp
import (
"net/http"
"sort"
"strings"

"github.com/grafana/loki/pkg/logproto"
)
Expand All @@ -25,6 +26,17 @@ func ParseSeriesQuery(r *http.Request) (*logproto.SeriesRequest, error) {
deduped := union(xs, ys)
sort.Strings(deduped)

// Special case to allow for an empty label matcher `{}` in a Series query,
// we support either not specifying the `match` query parameter at all or specifying it with a single `{}`
// empty label matcher, we treat the latter case here as if no `match` was supplied at all.
if len(deduped) == 1 {
matcher := deduped[0]
matcher = strings.Replace(matcher, " ", "", -1)
if matcher == "{}" {
deduped = deduped[:0]
}
}

// ensure matchers are valid before fanning out to ingesters/store as well as returning valuable parsing errors
// instead of 500s
_, err = Match(deduped)
Expand Down
30 changes: 27 additions & 3 deletions pkg/loghttp/series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,32 @@ func TestParseSeriesQuery(t *testing.T) {
}{
{
"no match",
withForm(url.Values{}),
true,
nil,
withForm(url.Values{
"start": []string{"1000"},
"end": []string{"2000"},
}),
false,
mkSeriesRequest(t, "1000", "2000", []string{}),
},
{
"empty matcher",
withForm(url.Values{
"start": []string{"1000"},
"end": []string{"2000"},
"match": []string{"{}"},
}),
false,
mkSeriesRequest(t, "1000", "2000", []string{}),
},
{
"empty matcher with whitespace",
withForm(url.Values{
"start": []string{"1000"},
"end": []string{"2000"},
"match": []string{" { }"},
}),
false,
mkSeriesRequest(t, "1000", "2000", []string{}),
},
{
"malformed",
Expand Down Expand Up @@ -80,6 +103,7 @@ func withForm(form url.Values) *http.Request {
return &http.Request{Form: form}
}

// nolint
func mkSeriesRequest(t *testing.T, from, to string, matches []string) *logproto.SeriesRequest {
start, end, err := bounds(withForm(url.Values{
"start": []string{from},
Expand Down
43 changes: 30 additions & 13 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,26 +461,43 @@ func (q *Querier) seriesForMatchers(
) ([]logproto.SeriesIdentifier, error) {

var results []logproto.SeriesIdentifier
for _, group := range groups {
ids, err := q.store.GetSeries(ctx, logql.SelectParams{
QueryRequest: &logproto.QueryRequest{
Selector: group,
Limit: 1,
Start: from,
End: through,
Direction: logproto.FORWARD,
},
})
// If no matchers were specified for the series query,
// we send a query with an empty matcher which will match every series.
if len(groups) == 0 {
var err error
results, err = q.seriesForMatcher(ctx, from, through, "")
if err != nil {
return nil, err
}

results = append(results, ids...)

} else {
for _, group := range groups {
ids, err := q.seriesForMatcher(ctx, from, through, group)
if err != nil {
return nil, err
}
results = append(results, ids...)
}
}
return results, nil
}

// seriesForMatcher fetches series from the store for a given matcher
func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string) ([]logproto.SeriesIdentifier, error) {
ids, err := q.store.GetSeries(ctx, logql.SelectParams{
QueryRequest: &logproto.QueryRequest{
Selector: matcher,
Limit: 1,
Start: from,
End: through,
Direction: logproto.FORWARD,
},
})
if err != nil {
return nil, err
}
return ids, nil
}

func (q *Querier) validateQueryRequest(ctx context.Context, req *logproto.QueryRequest) error {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
Expand Down
21 changes: 18 additions & 3 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,24 @@ func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from
}

func (s *store) GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error) {
matchers, _, from, through, err := decodeReq(req)
if err != nil {
return nil, err
var from, through model.Time
var matchers []*labels.Matcher

// The Loki parser doesn't allow for an empty label matcher but for the Series API
// we allow this to select all series in the time range.
if req.Selector == "" {
from, through = util.RoundToMilliseconds(req.Start, req.End)
nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs")
if err != nil {
return nil, err
}
matchers = []*labels.Matcher{nameLabelMatcher}
} else {
var err error
matchers, _, from, through, err = decodeReq(req)
if err != nil {
return nil, err
}
}

lazyChunks, err := s.lazyChunks(ctx, matchers, from, through)
Expand Down

0 comments on commit aee064d

Please sign in to comment.