Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Improve Detected labels API #12816

Merged
merged 16 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1397,7 +1397,7 @@ func (i *Ingester) GetDetectedLabels(ctx context.Context, req *logproto.Detected
}
}

labelMap, err := instance.LabelsWithValues(ctx, *req.Start, matchers...)
labelMap, err := instance.LabelsWithValues(ctx, req.Start, matchers...)

if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,8 +833,8 @@ func TestIngester_GetDetectedLabels(t *testing.T) {
require.NoError(t, err)

res, err := i.GetDetectedLabels(ctx, &logproto.DetectedLabelsRequest{
Start: &[]time.Time{time.Now().Add(11 * time.Nanosecond)}[0],
End: nil,
Start: []time.Time{time.Now().Add(11 * time.Nanosecond)}[0],
End: []time.Time{time.Now().Add(11 * time.Nanosecond)}[0],
Query: "",
})

Expand Down Expand Up @@ -893,8 +893,8 @@ func TestIngester_GetDetectedLabelsWithQuery(t *testing.T) {
require.NoError(t, err)

res, err := i.GetDetectedLabels(ctx, &logproto.DetectedLabelsRequest{
Start: &[]time.Time{time.Now().Add(11 * time.Nanosecond)}[0],
End: nil,
Start: []time.Time{time.Now().Add(11 * time.Nanosecond)}[0],
End: []time.Time{time.Now().Add(11 * time.Nanosecond)}[0],
Query: `{foo="bar"}`,
})

Expand Down
11 changes: 9 additions & 2 deletions pkg/loghttp/labels.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loghttp

import (
"errors"
"net/http"
"sort"
"strconv"
Expand Down Expand Up @@ -88,14 +89,20 @@ func ParseLabelQuery(r *http.Request) (*logproto.LabelRequest, error) {
}

func ParseDetectedLabelsQuery(r *http.Request) (*logproto.DetectedLabelsRequest, error) {
var err error

start, end, err := bounds(r)
if err != nil {
return nil, err
}

if end.Before(start) {
return nil, errors.New("end timestamp must not be before or equal to start time")
}

return &logproto.DetectedLabelsRequest{
Start: &start,
End: &end,
Start: start,
End: end,
Query: query(r),
}, nil
}
480 changes: 258 additions & 222 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -478,11 +478,11 @@ message DetectedField {
message DetectedLabelsRequest {
google.protobuf.Timestamp start = 1 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = true
(gogoproto.nullable) = false
];
google.protobuf.Timestamp end = 2 [
(gogoproto.stdtime) = true,
(gogoproto.nullable) = true
(gogoproto.nullable) = false
];
string query = 3;
}
Expand All @@ -494,4 +494,5 @@ message DetectedLabelsResponse {
message DetectedLabel {
string label = 1;
uint64 cardinality = 2;
bytes sketch = 3 [(gogoproto.jsontag) = "sketch,omitempty"];
}
113 changes: 62 additions & 51 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"flag"
"fmt"
"net/http"
"regexp"
"sort"
"strconv"
"time"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -915,7 +915,6 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.
if err != nil {
return nil, err
}
var detectedLabels []*logproto.DetectedLabel
staticLabels := map[string]struct{}{"cluster": {}, "namespace": {}, "instance": {}, "pod": {}}

// Enforce the query timeout while querying backends
Expand All @@ -924,24 +923,26 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.
defer cancel()
g, ctx := errgroup.WithContext(ctx)

if *req.Start, *req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, *req.Start, *req.End); err != nil {
if req.Start, req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, req.Start, req.End); err != nil {
return nil, err
}
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(*req.Start, *req.End)
ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(req.Start, req.End)

// Fetch labels from ingesters
var ingesterLabels *logproto.LabelToValuesResponse
if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil {
g.Go(func() error {
var err error
splitReq := *req
splitReq.Start = &ingesterQueryInterval.start
splitReq.End = &ingesterQueryInterval.end
splitReq.Start = ingesterQueryInterval.start
splitReq.End = ingesterQueryInterval.end

ingesterLabels, err = q.ingesterQuerier.DetectedLabel(ctx, &splitReq)
return err
})
}

// Fetch labels from the store
storeLabelsMap := make(map[string][]string)
if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil {
var matchers []*labels.Matcher
Expand All @@ -961,9 +962,7 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.
if err != nil {
return err
}
if q.isLabelRelevant(label, values, staticLabels) {
storeLabelsMap[label] = values
}
storeLabelsMap[label] = values
}
return err
})
Expand All @@ -979,40 +978,58 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.
}, nil
}

// append static labels before so they are in sorted order
for l := range staticLabels {
if values, present := ingesterLabels.Labels[l]; present {
detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: l, Cardinality: uint64(len(values.Values))})
}
}
return &logproto.DetectedLabelsResponse{
DetectedLabels: countLabelsAndCardinality(storeLabelsMap, ingesterLabels, staticLabels),
}, nil
}

func countLabelsAndCardinality(storeLabelsMap map[string][]string, ingesterLabels *logproto.LabelToValuesResponse, staticLabels map[string]struct{}) []*logproto.DetectedLabel {
dlMap := make(map[string]*parsedFields)

if ingesterLabels != nil {
for label, values := range ingesterLabels.Labels {
if q.isLabelRelevant(label, values.Values, staticLabels) {
combinedValues := values.Values
storeValues, storeHasLabel := storeLabelsMap[label]
if storeHasLabel {
combinedValues = append(combinedValues, storeValues...)
for label, val := range ingesterLabels.Labels {
if _, isStatic := staticLabels[label]; isStatic || !containsAllIDTypes(val.Values) {
_, ok := dlMap[label]
if !ok {
dlMap[label] = newParsedLabels()
}

slices.Sort(combinedValues)
uniqueValues := slices.Compact(combinedValues)
// TODO(shantanu): There's a bug here. Unique values can go above 50. Will need a bit of refactoring
detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqueValues))})
delete(storeLabelsMap, label)
parsedFields := dlMap[label]
for _, v := range val.Values {
parsedFields.Insert(v)
}
}
}
}

for label, values := range storeLabelsMap {
slices.Sort(values)
uniqueValues := slices.Compact(values)
detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqueValues))})
if _, isStatic := staticLabels[label]; isStatic || !containsAllIDTypes(values) {
_, ok := dlMap[label]
if !ok {
dlMap[label] = newParsedLabels()
}

parsedFields := dlMap[label]
for _, v := range values {
parsedFields.Insert(v)
}
}
}

return &logproto.DetectedLabelsResponse{
DetectedLabels: detectedLabels,
}, nil
var detectedLabels []*logproto.DetectedLabel
for k, v := range dlMap {
sketch, err := v.sketch.MarshalBinary()
if err != nil {
// TODO: add log here
continue
}
detectedLabels = append(detectedLabels, &logproto.DetectedLabel{
Label: k,
Cardinality: v.Estimate(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, good call here, we hopefully don't care about this but if there's only 1 split we'll need the estimate here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry didn't get the issue.. even if there's a 1 split, the estimate will be populated here right?

Sketch: sketch,
})
}
return detectedLabels
}

type PatterQuerier interface {
Expand All @@ -1031,28 +1048,15 @@ func (q *SingleTenantQuerier) Patterns(ctx context.Context, req *logproto.QueryP
return res, err
}

// isLabelRelevant returns if the label is relevant for logs app. A label is relevant if it is not of any numeric, UUID or GUID type
// It is also not relevant to return if the values are less than 1 or beyond 50.
func (q *SingleTenantQuerier) isLabelRelevant(label string, values []string, staticLabels map[string]struct{}) bool {
cardinality := len(values)
_, isStaticLabel := staticLabels[label]
if isStaticLabel || (cardinality < 2 || cardinality > 50) ||
containsAllIDTypes(values) {
return false
}

return true
}

// containsAllIDTypes filters out all UUID, GUID and numeric types. Returns false if even one value is not of the type
func containsAllIDTypes(values []string) bool {
pattern := `^(?:(?:[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})|(?:(?:\{)?[0-9a-fA-F]{8}(?:-?[0-9a-fA-F]{4}){3}-?[0-9a-fA-F]{12}(?:\})?)|(\d+(?:\.\d+)?))$`

re := regexp.MustCompile(pattern)

for _, v := range values {
if !re.MatchString(v) {
return false
_, err := strconv.ParseFloat(v, 64)
if err != nil {
_, err = uuid.Parse(v)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much cleaner than the regex pattern

if err != nil {
return false
}
}
}

Expand Down Expand Up @@ -1139,6 +1143,13 @@ func newParsedFields(parser *string) *parsedFields {
}
}

func newParsedLabels() *parsedFields {
return &parsedFields{
sketch: hyperloglog.New(),
fieldType: logproto.DetectedFieldString,
}
}

func (p *parsedFields) Insert(value string) {
p.sketch.Insert([]byte(value))
}
Expand Down
Loading
Loading