diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f3c66de953ce1..58154dce78023 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -105,12 +105,26 @@ func TestIngester(t *testing.T) { // Series - // empty matchers - _, err = i.Series(ctx, &logproto.SeriesRequest{ + // empty matcher return all series + resp, err := i.Series(ctx, &logproto.SeriesRequest{ Start: time.Unix(0, 0), End: time.Unix(1, 0), }) - require.Error(t, err) + require.Nil(t, err) + require.ElementsMatch(t, []logproto.SeriesIdentifier{ + { + Labels: map[string]string{ + "foo": "bar", + "bar": "baz1", + }, + }, + { + Labels: map[string]string{ + "foo": "bar", + "bar": "baz2", + }, + }, + }, resp.GetSeries()) // wrong matchers fmt _, err = i.Series(ctx, &logproto.SeriesRequest{ @@ -129,7 +143,7 @@ func TestIngester(t *testing.T) { require.Error(t, err) // foo=bar - resp, err := i.Series(ctx, &logproto.SeriesRequest{ + resp, err = i.Series(ctx, &logproto.SeriesRequest{ Start: time.Unix(0, 0), End: time.Unix(1, 0), Groups: []string{`{foo="bar"}`}, diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 07375674832c5..a99223f04f17d 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -249,33 +249,64 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp return nil, err } - dedupedSeries := make(map[uint64]logproto.SeriesIdentifier) - for _, matchers := range groups { - err = i.forMatchingStreams(matchers, func(stream *stream) error { - // exit early when this stream was added by an earlier group - key := stream.labels.Hash() - if _, found := dedupedSeries[key]; found { - return nil - } + var series []logproto.SeriesIdentifier - dedupedSeries[key] = logproto.SeriesIdentifier{ + // If no matchers were supplied we include all streams. + if len(groups) == 0 { + series = make([]logproto.SeriesIdentifier, 0, len(i.streams)) + err = i.forAllStreams(func(stream *stream) error { + series = append(series, logproto.SeriesIdentifier{ Labels: stream.labels.Map(), - } + }) return nil }) - if err != nil { return nil, err } - } - series := make([]logproto.SeriesIdentifier, 0, len(dedupedSeries)) - for _, v := range dedupedSeries { - series = append(series, v) + } else { + dedupedSeries := make(map[uint64]logproto.SeriesIdentifier) + for _, matchers := range groups { + err = i.forMatchingStreams(matchers, func(stream *stream) error { + // exit early when this stream was added by an earlier group + key := stream.labels.Hash() + if _, found := dedupedSeries[key]; found { + return nil + } + + dedupedSeries[key] = logproto.SeriesIdentifier{ + Labels: stream.labels.Map(), + } + return nil + }) + if err != nil { + return nil, err + } + } + series = make([]logproto.SeriesIdentifier, 0, len(dedupedSeries)) + for _, v := range dedupedSeries { + series = append(series, v) + } } + return &logproto.SeriesResponse{Series: series}, nil } +// forAllStreams will execute a function for all streams in the instance. +// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex. +func (i *instance) forAllStreams(fn func(*stream) error) error { + i.streamsMtx.RLock() + defer i.streamsMtx.RUnlock() + + for _, stream := range i.streams { + err := fn(stream) + if err != nil { + return err + } + } + return nil +} + // forMatchingStreams will execute a function for each stream that satisfies a set of requirements (time range, matchers, etc). // It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex. func (i *instance) forMatchingStreams( diff --git a/pkg/loghttp/params.go b/pkg/loghttp/params.go index 6d0af077752fb..cedc8d8fd9a84 100644 --- a/pkg/loghttp/params.go +++ b/pkg/loghttp/params.go @@ -79,10 +79,6 @@ func interval(r *http.Request) (time.Duration, error) { // Match extracts and parses multiple matcher groups from a slice of strings func Match(xs []string) ([][]*labels.Matcher, error) { - if len(xs) == 0 { - return nil, errors.New("0 matcher groups supplied") - } - groups := make([][]*labels.Matcher, 0, len(xs)) for _, x := range xs { ms, err := logql.ParseMatchers(x) diff --git a/pkg/loghttp/params_test.go b/pkg/loghttp/params_test.go index ca6a632e8203c..12664515dfac1 100644 --- a/pkg/loghttp/params_test.go +++ b/pkg/loghttp/params_test.go @@ -234,7 +234,8 @@ func Test_match(t *testing.T) { wantErr bool }{ {"malformed", []string{`{a="1`}, nil, true}, - {"errors on nil input", nil, nil, true}, + {"empty on nil input", nil, [][]*labels.Matcher{}, false}, + {"empty on empty input", []string{}, [][]*labels.Matcher{}, false}, { "single", []string{`{a="1"}`}, diff --git a/pkg/loghttp/series.go b/pkg/loghttp/series.go index b4601ed3402de..15d0d48f09ce7 100644 --- a/pkg/loghttp/series.go +++ b/pkg/loghttp/series.go @@ -3,6 +3,7 @@ package loghttp import ( "net/http" "sort" + "strings" "github.com/grafana/loki/pkg/logproto" ) @@ -25,6 +26,17 @@ func ParseSeriesQuery(r *http.Request) (*logproto.SeriesRequest, error) { deduped := union(xs, ys) sort.Strings(deduped) + // Special case to allow for an empty label matcher `{}` in a Series query, + // we support either not specifying the `match` query parameter at all or specifying it with a single `{}` + // empty label matcher, we treat the latter case here as if no `match` was supplied at all. + if len(deduped) == 1 { + matcher := deduped[0] + matcher = strings.Replace(matcher, " ", "", -1) + if matcher == "{}" { + deduped = deduped[:0] + } + } + // ensure matchers are valid before fanning out to ingesters/store as well as returning valuable parsing errors // instead of 500s _, err = Match(deduped) diff --git a/pkg/loghttp/series_test.go b/pkg/loghttp/series_test.go index 8f0792a5eead4..1316e20f1ff0c 100644 --- a/pkg/loghttp/series_test.go +++ b/pkg/loghttp/series_test.go @@ -19,9 +19,32 @@ func TestParseSeriesQuery(t *testing.T) { }{ { "no match", - withForm(url.Values{}), - true, - nil, + withForm(url.Values{ + "start": []string{"1000"}, + "end": []string{"2000"}, + }), + false, + mkSeriesRequest(t, "1000", "2000", []string{}), + }, + { + "empty matcher", + withForm(url.Values{ + "start": []string{"1000"}, + "end": []string{"2000"}, + "match": []string{"{}"}, + }), + false, + mkSeriesRequest(t, "1000", "2000", []string{}), + }, + { + "empty matcher with whitespace", + withForm(url.Values{ + "start": []string{"1000"}, + "end": []string{"2000"}, + "match": []string{" { }"}, + }), + false, + mkSeriesRequest(t, "1000", "2000", []string{}), }, { "malformed", @@ -80,6 +103,7 @@ func withForm(form url.Values) *http.Request { return &http.Request{Form: form} } +// nolint func mkSeriesRequest(t *testing.T, from, to string, matches []string) *logproto.SeriesRequest { start, end, err := bounds(withForm(url.Values{ "start": []string{from}, diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 345c45041a12f..88e95308d3055 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -461,26 +461,43 @@ func (q *Querier) seriesForMatchers( ) ([]logproto.SeriesIdentifier, error) { var results []logproto.SeriesIdentifier - for _, group := range groups { - ids, err := q.store.GetSeries(ctx, logql.SelectParams{ - QueryRequest: &logproto.QueryRequest{ - Selector: group, - Limit: 1, - Start: from, - End: through, - Direction: logproto.FORWARD, - }, - }) + // If no matchers were specified for the series query, + // we send a query with an empty matcher which will match every series. + if len(groups) == 0 { + var err error + results, err = q.seriesForMatcher(ctx, from, through, "") if err != nil { return nil, err } - - results = append(results, ids...) - + } else { + for _, group := range groups { + ids, err := q.seriesForMatcher(ctx, from, through, group) + if err != nil { + return nil, err + } + results = append(results, ids...) + } } return results, nil } +// seriesForMatcher fetches series from the store for a given matcher +func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string) ([]logproto.SeriesIdentifier, error) { + ids, err := q.store.GetSeries(ctx, logql.SelectParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: matcher, + Limit: 1, + Start: from, + End: through, + Direction: logproto.FORWARD, + }, + }) + if err != nil { + return nil, err + } + return ids, nil +} + func (q *Querier) validateQueryRequest(ctx context.Context, req *logproto.QueryRequest) error { userID, err := user.ExtractOrgID(ctx) if err != nil { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 323b96083fa4b..4c9f2b54452a0 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -148,9 +148,24 @@ func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from } func (s *store) GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error) { - matchers, _, from, through, err := decodeReq(req) - if err != nil { - return nil, err + var from, through model.Time + var matchers []*labels.Matcher + + // The Loki parser doesn't allow for an empty label matcher but for the Series API + // we allow this to select all series in the time range. + if req.Selector == "" { + from, through = util.RoundToMilliseconds(req.Start, req.End) + nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs") + if err != nil { + return nil, err + } + matchers = []*labels.Matcher{nameLabelMatcher} + } else { + var err error + matchers, _, from, through, err = decodeReq(req) + if err != nil { + return nil, err + } } lazyChunks, err := s.lazyChunks(ctx, matchers, from, through)