Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki: Series API will return all series with no match or empty matcher #2254

Merged
merged 3 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,26 @@ func TestIngester(t *testing.T) {

// Series

// empty matchers
_, err = i.Series(ctx, &logproto.SeriesRequest{
// empty matcher return all series
resp, err := i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
})
require.Error(t, err)
require.Nil(t, err)
require.ElementsMatch(t, []logproto.SeriesIdentifier{
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz1",
},
},
{
Labels: map[string]string{
"foo": "bar",
"bar": "baz2",
},
},
}, resp.GetSeries())

// wrong matchers fmt
_, err = i.Series(ctx, &logproto.SeriesRequest{
Expand All @@ -129,7 +143,7 @@ func TestIngester(t *testing.T) {
require.Error(t, err)

// foo=bar
resp, err := i.Series(ctx, &logproto.SeriesRequest{
resp, err = i.Series(ctx, &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
Groups: []string{`{foo="bar"}`},
Expand Down
61 changes: 46 additions & 15 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,33 +249,64 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp
return nil, err
}

dedupedSeries := make(map[uint64]logproto.SeriesIdentifier)
for _, matchers := range groups {
err = i.forMatchingStreams(matchers, func(stream *stream) error {
// exit early when this stream was added by an earlier group
key := stream.labels.Hash()
if _, found := dedupedSeries[key]; found {
return nil
}
var series []logproto.SeriesIdentifier

dedupedSeries[key] = logproto.SeriesIdentifier{
// If no matchers were supplied we include all streams.
if len(groups) == 0 {
series = make([]logproto.SeriesIdentifier, 0, len(i.streams))
Copy link
Collaborator Author

@slim-bean slim-bean Jun 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allocation isn't protected by a mutex so it could be wrong but I think this would generally be ok, worst case a stream is added between the creation of this array and it's population causing Go to have to double its size. I'm thinking this is ok but am open to alternatives or different opinions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem here from my point of view. But if you're not sure you can write a test that does 2 query in different goroutine and run that test using -race.

go test -mod=vendor -race ./pkg/ingester.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could have kept the same function func(stream *stream) error for both cases.`

err = i.forAllStreams(func(stream *stream) error {
series = append(series, logproto.SeriesIdentifier{
Labels: stream.labels.Map(),
}
})
return nil
})

if err != nil {
return nil, err
}
}
series := make([]logproto.SeriesIdentifier, 0, len(dedupedSeries))
for _, v := range dedupedSeries {
series = append(series, v)
} else {
dedupedSeries := make(map[uint64]logproto.SeriesIdentifier)
for _, matchers := range groups {
err = i.forMatchingStreams(matchers, func(stream *stream) error {
// exit early when this stream was added by an earlier group
key := stream.labels.Hash()
if _, found := dedupedSeries[key]; found {
return nil
}

dedupedSeries[key] = logproto.SeriesIdentifier{
Labels: stream.labels.Map(),
}
return nil
})
if err != nil {
return nil, err
}
}
series = make([]logproto.SeriesIdentifier, 0, len(dedupedSeries))
for _, v := range dedupedSeries {
series = append(series, v)

}
}

return &logproto.SeriesResponse{Series: series}, nil
}

// forAllStreams will execute a function for all streams in the instance.
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
func (i *instance) forAllStreams(fn func(*stream) error) error {
i.streamsMtx.RLock()
defer i.streamsMtx.RUnlock()

for _, stream := range i.streams {
err := fn(stream)
if err != nil {
return err
}
}
return nil
}

// forMatchingStreams will execute a function for each stream that satisfies a set of requirements (time range, matchers, etc).
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
func (i *instance) forMatchingStreams(
Expand Down
4 changes: 0 additions & 4 deletions pkg/loghttp/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ func interval(r *http.Request) (time.Duration, error) {

// Match extracts and parses multiple matcher groups from a slice of strings
func Match(xs []string) ([][]*labels.Matcher, error) {
if len(xs) == 0 {
return nil, errors.New("0 matcher groups supplied")
}

groups := make([][]*labels.Matcher, 0, len(xs))
for _, x := range xs {
ms, err := logql.ParseMatchers(x)
Expand Down
3 changes: 2 additions & 1 deletion pkg/loghttp/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ func Test_match(t *testing.T) {
wantErr bool
}{
{"malformed", []string{`{a="1`}, nil, true},
{"errors on nil input", nil, nil, true},
{"empty on nil input", nil, [][]*labels.Matcher{}, false},
{"empty on empty input", []string{}, [][]*labels.Matcher{}, false},
{
"single",
[]string{`{a="1"}`},
Expand Down
12 changes: 12 additions & 0 deletions pkg/loghttp/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package loghttp
import (
"net/http"
"sort"
"strings"

"github.com/grafana/loki/pkg/logproto"
)
Expand All @@ -25,6 +26,17 @@ func ParseSeriesQuery(r *http.Request) (*logproto.SeriesRequest, error) {
deduped := union(xs, ys)
sort.Strings(deduped)

// Special case to allow for an empty label matcher `{}` in a Series query,
// we support either not specifying the `match` query parameter at all or specifying it with a single `{}`
// empty label matcher, we treat the latter case here as if no `match` was supplied at all.
if len(deduped) == 1 {
matcher := deduped[0]
matcher = strings.Replace(matcher, " ", "", -1)
if matcher == "{}" {
deduped = deduped[:0]
}
}

// ensure matchers are valid before fanning out to ingesters/store as well as returning valuable parsing errors
// instead of 500s
_, err = Match(deduped)
Expand Down
30 changes: 27 additions & 3 deletions pkg/loghttp/series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,32 @@ func TestParseSeriesQuery(t *testing.T) {
}{
{
"no match",
withForm(url.Values{}),
true,
nil,
withForm(url.Values{
"start": []string{"1000"},
"end": []string{"2000"},
}),
false,
mkSeriesRequest(t, "1000", "2000", []string{}),
},
{
"empty matcher",
withForm(url.Values{
"start": []string{"1000"},
"end": []string{"2000"},
"match": []string{"{}"},
}),
false,
mkSeriesRequest(t, "1000", "2000", []string{}),
},
{
"empty matcher with whitespace",
withForm(url.Values{
"start": []string{"1000"},
"end": []string{"2000"},
"match": []string{" { }"},
}),
false,
mkSeriesRequest(t, "1000", "2000", []string{}),
},
{
"malformed",
Expand Down Expand Up @@ -80,6 +103,7 @@ func withForm(form url.Values) *http.Request {
return &http.Request{Form: form}
}

// nolint
func mkSeriesRequest(t *testing.T, from, to string, matches []string) *logproto.SeriesRequest {
start, end, err := bounds(withForm(url.Values{
"start": []string{from},
Expand Down
43 changes: 30 additions & 13 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,26 +461,43 @@ func (q *Querier) seriesForMatchers(
) ([]logproto.SeriesIdentifier, error) {

var results []logproto.SeriesIdentifier
for _, group := range groups {
ids, err := q.store.GetSeries(ctx, logql.SelectParams{
QueryRequest: &logproto.QueryRequest{
Selector: group,
Limit: 1,
Start: from,
End: through,
Direction: logproto.FORWARD,
},
})
// If no matchers were specified for the series query,
// we send a query with an empty matcher which will match every series.
if len(groups) == 0 {
var err error
results, err = q.seriesForMatcher(ctx, from, through, "")
if err != nil {
return nil, err
}

results = append(results, ids...)

} else {
for _, group := range groups {
ids, err := q.seriesForMatcher(ctx, from, through, group)
if err != nil {
return nil, err
}
results = append(results, ids...)
}
}
return results, nil
}

// seriesForMatcher fetches series from the store for a given matcher
func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, matcher string) ([]logproto.SeriesIdentifier, error) {
ids, err := q.store.GetSeries(ctx, logql.SelectParams{
QueryRequest: &logproto.QueryRequest{
Selector: matcher,
Limit: 1,
Start: from,
End: through,
Direction: logproto.FORWARD,
},
})
if err != nil {
return nil, err
}
return ids, nil
}

func (q *Querier) validateQueryRequest(ctx context.Context, req *logproto.QueryRequest) error {
userID, err := user.ExtractOrgID(ctx)
if err != nil {
Expand Down
21 changes: 18 additions & 3 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,24 @@ func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from
}

func (s *store) GetSeries(ctx context.Context, req logql.SelectParams) ([]logproto.SeriesIdentifier, error) {
matchers, _, from, through, err := decodeReq(req)
if err != nil {
return nil, err
var from, through model.Time
var matchers []*labels.Matcher

// The Loki parser doesn't allow for an empty label matcher but for the Series API
// we allow this to select all series in the time range.
if req.Selector == "" {
from, through = util.RoundToMilliseconds(req.Start, req.End)
nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs")
if err != nil {
return nil, err
}
matchers = []*labels.Matcher{nameLabelMatcher}
Comment on lines +157 to +162
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One concern I have here is this shortcut also skips the shard aware stuff in decodeReq, currently the series API is not shard aware so this is fine but I'm not sure if there is something else we could do here to protect future us from missing this. See #2167

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would need to get the part that parses the string matcher out and pass the matchers as an array to decodeReq.

} else {
var err error
matchers, _, from, through, err = decodeReq(req)
if err != nil {
return nil, err
}
}

lazyChunks, err := s.lazyChunks(ctx, matchers, from, through)
Expand Down