Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit of 0 disables max_bytes_per_tag_values_query #1447

Merged
merged 8 commits into from
May 31, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
## main / unreleased

* [CHANGE] metrics-generator: Changed added metric label `instance` to `__metrics_gen_instance` to reduce collisions with custom dimensions. [#1439](https://github.com/grafana/tempo/pull/1439) (@joe-elliott)
* [CHANGE] Don't enforce `max_bytes_per_tag_values_query` when set to 0. [#1447](https://github.com/grafana/tempo/pull/1447) (@joe-elliott)
* [FEATURE] metrics-generator: support per-tenant processor configuration [#1434](https://github.com/grafana/tempo/pull/1434) (@kvrhdn)
* [ENHANCEMENT] Added the ability to have a per tenant max search duration. [#1421](https://github.com/grafana/tempo/pull/1421) (@joe-elliott)
* [BUGFIX] Fix nil pointer panic when the trace by id path errors. [1441](https://github.com/grafana/tempo/pull/1441) (@joe-elliott)
* [BUGFIX] Fix nil pointer panic when the trace by id path errors. [#1441](https://github.com/grafana/tempo/pull/1441) (@joe-elliott)

## v1.4.1 / 2022-05-05

Expand Down
1 change: 1 addition & 0 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,7 @@ overrides:
# to populate the autocomplete dropdown. This limit protects the system from
# tags with high cardinality or large values such as HTTP URLs or SQL queries.
# This override limit is used by the ingester and the querier.
# A value of 0 disables the limit.
[max_bytes_per_tag_values_query: <int> | default = 5000000 (5MB) ]

# Metrics-generator configurations
Expand Down
22 changes: 8 additions & 14 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"context"
"fmt"
"sort"

"github.com/go-kit/log/level"
Expand Down Expand Up @@ -221,8 +222,6 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop
if err != nil {
return nil, err
}
// get limit from override
maxBytesPerTagValuesQuery := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)

kv := &tempofb.KeyValues{}
tagNameBytes := []byte(tagName)
Expand All @@ -243,12 +242,10 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop
}

// check if size of values map is within limit after scanning live traces
if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(log.Logger).Log("msg", "size of tag values from live traces exceeded limit, reduce cardinality or size of tags", "tag", tagName)
// return empty response to avoid querier OOMs
return &tempopb.SearchTagValuesResponse{
TagValues: []string{},
}, nil
maxBytesPerTagValuesQuery := i.limiter.limits.MaxBytesPerTagValuesQuery(userID)
if maxBytesPerTagValuesQuery > 0 && !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(log.Logger).Log("msg", "size of tag values from live traces exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID)
return nil, fmt.Errorf("tag values exceeded allowed max bytes (%d)", maxBytesPerTagValuesQuery)
}

err = i.visitSearchableBlocks(ctx, func(block search.SearchableBlock) error {
Expand All @@ -259,12 +256,9 @@ func (i *instance) SearchTagValues(ctx context.Context, tagName string) (*tempop
}

// check if size of values map is within limit after scanning all blocks
if !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName)
// return empty response to avoid querier OOMs
return &tempopb.SearchTagValuesResponse{
TagValues: []string{},
}, nil
if maxBytesPerTagValuesQuery > 0 && !util.MapSizeWithinLimit(values, maxBytesPerTagValuesQuery) {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", tagName, "userID", userID)
return nil, fmt.Errorf("tag values exceeded allowed max bytes (%d)", maxBytesPerTagValuesQuery)
}

return &tempopb.SearchTagValuesResponse{
Expand Down
209 changes: 152 additions & 57 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
"testing"
"time"

Expand All @@ -24,21 +26,6 @@ import (
"github.com/grafana/tempo/tempodb/search"
)

func checkEqual(t *testing.T, ids [][]byte, sr *tempopb.SearchResponse) {
for _, meta := range sr.Traces {
parsedTraceID, err := util.HexStringToTraceID(meta.TraceID)
assert.NoError(t, err)

present := false
for _, id := range ids {
if bytes.Equal(parsedTraceID, id) {
present = true
}
}
assert.True(t, present)
}
}

func TestInstanceSearch(t *testing.T) {
limits, err := overrides.NewOverrides(overrides.Limits{})
assert.NoError(t, err, "unexpected error creating limits")
Expand All @@ -50,45 +37,9 @@ func TestInstanceSearch(t *testing.T) {
i, err := newInstance("fake", limiter, ingester.store, ingester.local)
assert.NoError(t, err, "unexpected error creating new instance")

// This matches the encoding for live traces, since
// we are pushing to the instance directly it must match.
dec := model.MustNewSegmentDecoder(model.CurrentEncoding)

numTraces := 500
searchAnnotatedFractionDenominator := 100
ids := [][]byte{}

// add dummy search data
var tagKey = "foo"
var tagValue = "bar"

for j := 0; j < numTraces; j++ {
id := make([]byte, 16)
rand.Read(id)

testTrace := test.MakeTrace(10, id)
trace.SortTrace(testTrace)
traceBytes, err := dec.PrepareForWrite(testTrace, 0, 0)
require.NoError(t, err)

// annotate just a fraction of traces with search data
var searchData []byte
if j%searchAnnotatedFractionDenominator == 0 {
data := &tempofb.SearchEntryMutable{}
data.TraceID = id
data.AddTag(tagKey, tagValue)
searchData = data.ToBytes()

// these are the only ids we want to test against
ids = append(ids, id)
}

// searchData will be nil if not
err = i.PushBytes(context.Background(), id, traceBytes, searchData)
require.NoError(t, err)

assert.Equal(t, int(i.traceCount.Load()), len(i.traces))
}
ids, _ := writeTracesWithSearchData(t, i, tagKey, tagValue, false)

var req = &tempopb.SearchRequest{
Tags: map[string]string{},
Expand All @@ -97,7 +48,7 @@ func TestInstanceSearch(t *testing.T) {

sr, err := i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator)
assert.Len(t, sr.Traces, len(ids))
// todo: test that returned results are in sorted time order, create order of id's beforehand
checkEqual(t, ids, sr)

Expand All @@ -108,7 +59,7 @@ func TestInstanceSearch(t *testing.T) {

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator)
assert.Len(t, sr.Traces, len(ids))
checkEqual(t, ids, sr)

// Test after cutting new headblock
Expand All @@ -118,7 +69,7 @@ func TestInstanceSearch(t *testing.T) {

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator)
assert.Len(t, sr.Traces, len(ids))
checkEqual(t, ids, sr)

// Test after completing a block
Expand All @@ -127,7 +78,7 @@ func TestInstanceSearch(t *testing.T) {

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator)
assert.Len(t, sr.Traces, len(ids))
checkEqual(t, ids, sr)

err = ingester.stopping(nil)
Expand All @@ -141,10 +92,154 @@ func TestInstanceSearch(t *testing.T) {

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator)
assert.Len(t, sr.Traces, len(ids))
checkEqual(t, ids, sr)
}

func checkEqual(t *testing.T, ids [][]byte, sr *tempopb.SearchResponse) {
for _, meta := range sr.Traces {
parsedTraceID, err := util.HexStringToTraceID(meta.TraceID)
assert.NoError(t, err)

present := false
for _, id := range ids {
if bytes.Equal(parsedTraceID, id) {
present = true
}
}
assert.True(t, present)
}
}

func TestInstanceSearchTags(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we club this test with the above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i moved the logic to write data to the instance to a single function shared by 3 tests. i do prefer the tests to remain independent.

joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
limits, err := overrides.NewOverrides(overrides.Limits{})
assert.NoError(t, err, "unexpected error creating limits")
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

tempDir := t.TempDir()

ingester, _, _ := defaultIngester(t, tempDir)
i, err := newInstance("fake", limiter, ingester.store, ingester.local)
assert.NoError(t, err, "unexpected error creating new instance")

// add dummy search data
var tagKey = "foo"
var tagValue = "bar"

_, expectedTagValues := writeTracesWithSearchData(t, i, tagKey, tagValue, true)

userCtx := user.InjectOrgID(context.Background(), "fake")
testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)

// Test after appending to WAL
err = i.CutCompleteTraces(0, true)
require.NoError(t, err)
assert.Equal(t, int(i.traceCount.Load()), len(i.traces))

testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)

// Test after cutting new headblock
blockID, err := i.CutBlockIfReady(0, 0, true)
require.NoError(t, err)
assert.NotEqual(t, blockID, uuid.Nil)

testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)

// Test after completing a block
err = i.CompleteBlock(blockID)
require.NoError(t, err)

testSearchTagsAndValues(t, userCtx, i, tagKey, expectedTagValues)
}

// nolint:revive,unparam
func testSearchTagsAndValues(t *testing.T, ctx context.Context, i *instance, tagName string, expectedTagValues []string) {
sr, err := i.SearchTags(ctx)
require.NoError(t, err)
srv, err := i.SearchTagValues(ctx, tagName)
require.NoError(t, err)

sort.Strings(srv.TagValues)
assert.Len(t, sr.TagNames, 1)
assert.Equal(t, tagName, sr.TagNames[0])
assert.Equal(t, expectedTagValues, srv.TagValues)
}

// TestInstanceSearchMaxBytesPerTagValuesQueryFails confirms that SearchTagValues returns
// an error if the bytes of the found tag value exceeds the MaxBytesPerTagValuesQuery limit
func TestInstanceSearchMaxBytesPerTagValuesQueryFails(t *testing.T) {
limits, err := overrides.NewOverrides(overrides.Limits{
MaxBytesPerTagValuesQuery: 10,
})
assert.NoError(t, err, "unexpected error creating limits")
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)

tempDir := t.TempDir()

ingester, _, _ := defaultIngester(t, tempDir)
i, err := newInstance("fake", limiter, ingester.store, ingester.local)
assert.NoError(t, err, "unexpected error creating new instance")

var tagKey = "foo"
var tagValue = "bar"

_, _ = writeTracesWithSearchData(t, i, tagKey, tagValue, true)

userCtx := user.InjectOrgID(context.Background(), "fake")
srv, err := i.SearchTagValues(userCtx, tagKey)
assert.Error(t, err)
assert.Nil(t, srv)
}

// writes traces to the given instance along with search data. returns
// ids expected to be returned from a tag search and strings expected to
// be returned from a tag value search
func writeTracesWithSearchData(t *testing.T, i *instance, tagKey string, tagValue string, postFixValue bool) ([][]byte, []string) {
// This matches the encoding for live traces, since
// we are pushing to the instance directly it must match.
dec := model.MustNewSegmentDecoder(model.CurrentEncoding)

numTraces := 100
searchAnnotatedFractionDenominator := 10
ids := [][]byte{}
expectedTagValues := []string{}

for j := 0; j < numTraces; j++ {
id := make([]byte, 16)
rand.Read(id)

testTrace := test.MakeTrace(10, id)
trace.SortTrace(testTrace)
traceBytes, err := dec.PrepareForWrite(testTrace, 0, 0)
require.NoError(t, err)

// annotate just a fraction of traces with search data
var searchData []byte
if j%searchAnnotatedFractionDenominator == 0 {
tv := tagValue
if postFixValue {
tv = tv + strconv.Itoa(j)
}

data := &tempofb.SearchEntryMutable{}
data.TraceID = id
data.AddTag(tagKey, tv)
searchData = data.ToBytes()

expectedTagValues = append(expectedTagValues, tv)
ids = append(ids, data.TraceID)
}

// searchData will be nil if not
err = i.PushBytes(context.Background(), id, traceBytes, searchData)
require.NoError(t, err)

assert.Equal(t, int(i.traceCount.Load()), len(i.traces))
}

return ids, expectedTagValues
}

func TestInstanceSearchNoData(t *testing.T) {
limits, err := overrides.NewOverrides(overrides.Limits{})
assert.NoError(t, err, "unexpected error creating limits")
Expand Down
4 changes: 2 additions & 2 deletions modules/overrides/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ type Limits struct {
// Compactor enforced limits.
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`

// Querier enforced limits.
// Querier and Ingester enforced limits.
MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"`

// QueryFrontend enforced limits
MaxSearchDuration model.Duration `yaml:"max_search_duration" json:"max_search_duration"`

// MaxBytesPerTrace is enforced in the Ingester, Compactor, Querier (Search) and Serverless (Search). It
// it not enforce currently when doing a trace by id lookup.
// is not used when doing a trace by id lookup.
MaxBytesPerTrace int `yaml:"max_bytes_per_trace" json:"max_bytes_per_trace"`

// Configuration for overrides, convenient if it goes here.
Expand Down
12 changes: 5 additions & 7 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,6 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal
return nil, errors.Wrap(err, "error extracting org id in Querier.SearchTagValues")
}

// fetch response size limit for tag-values query
tagValuesLimitBytes := q.limits.MaxBytesPerTagValuesQuery(userID)

replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, errors.Wrap(err, "error finding ingesters in Querier.SearchTagValues")
Expand All @@ -375,10 +372,11 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal
uniqueMap[v] = struct{}{}
}

if !util.MapSizeWithinLimit(uniqueMap, tagValuesLimitBytes) {
return &tempopb.SearchTagValuesResponse{
TagValues: []string{},
}, nil
// fetch response size limit for tag-values query
tagValuesLimitBytes := q.limits.MaxBytesPerTagValuesQuery(userID)
if tagValuesLimitBytes > 0 && !util.MapSizeWithinLimit(uniqueMap, tagValuesLimitBytes) {
level.Warn(log.Logger).Log("msg", "size of tag values in instance exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "userID", userID)
return nil, fmt.Errorf("tag values exceeded allowed max bytes (%d)", tagValuesLimitBytes)
}

// Final response (sorted)
Expand Down