Skip to content

Commit

Permalink
Merge pull request grafana#1328 from grafana/1325-cardinality-limit
Browse files Browse the repository at this point in the history
Include metric name, label name, number of entries and limit in cardinality errors.
  • Loading branch information
gouthamve authored Apr 29, 2019
2 parents 7d8510e + 85b1e78 commit dca32de
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 22 deletions.
14 changes: 3 additions & 11 deletions chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,35 +47,27 @@ var stores = []struct {
{
name: "store",
configFn: func() StoreConfig {
var (
storeCfg StoreConfig
)
var storeCfg StoreConfig
flagext.DefaultValues(&storeCfg)
return storeCfg
},
},
{
name: "cached_store",
configFn: func() StoreConfig {
var (
storeCfg StoreConfig
)
var storeCfg StoreConfig
flagext.DefaultValues(&storeCfg)

storeCfg.WriteDedupeCacheConfig.Cache = cache.NewFifoCache("test", cache.FifoCacheConfig{
Size: 500,
})

return storeCfg
},
},
}

// newTestStore creates a new Store for testing.
func newTestChunkStore(t *testing.T, schemaName string) Store {
var (
storeCfg StoreConfig
)
var storeCfg StoreConfig
flagext.DefaultValues(&storeCfg)
return newTestChunkStoreConfig(t, schemaName, storeCfg)
}
Expand Down
32 changes: 24 additions & 8 deletions series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/http"

"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
Expand All @@ -22,11 +21,19 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

var (
// ErrCardinalityExceeded is returned when the user reads a row that
// is too large.
ErrCardinalityExceeded = errors.New("cardinality limit exceeded")
// CardinalityExceededError is returned when the user reads a row that
// is too large.
type CardinalityExceededError struct {
MetricName, LabelName string
Size, Limit int32
}

func (e CardinalityExceededError) Error() string {
return fmt.Sprintf("cardinality limit exceeded for %s{%s}; %d entries, more than limit of %d",
e.MetricName, e.LabelName, e.Size, e.Limit)
}

var (
indexLookupsPerQuery = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "chunk_store_index_lookups_per_query",
Expand Down Expand Up @@ -196,6 +203,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from
var preIntersectionCount int
var lastErr error
var cardinalityExceededErrors int
var cardinalityExceededError CardinalityExceededError
for i := 0; i < len(matchers); i++ {
select {
case incoming := <-incomingIDs:
Expand All @@ -210,8 +218,9 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from
// series and the other returns only 10 (a few), we don't lookup the first one at all.
// We just manually filter through the 10 series again using "filterChunksByMatchers",
// saving us from looking up and intersecting a lot of series.
if err == ErrCardinalityExceeded {
if e, ok := err.(CardinalityExceededError); ok {
cardinalityExceededErrors++
cardinalityExceededError = e
} else {
lastErr = err
}
Expand All @@ -220,7 +229,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from

// But if every single matcher returns a lot of series, then it makes sense to abort the query.
if cardinalityExceededErrors == len(matchers) {
return nil, ErrCardinalityExceeded
return nil, cardinalityExceededError
} else if lastErr != nil {
return nil, lastErr
}
Expand All @@ -241,11 +250,14 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from,
}

var queries []IndexQuery
var labelName string
if matcher == nil {
queries, err = c.schema.GetReadQueriesForMetric(from, through, userID, model.LabelValue(metricName))
} else if matcher.Type != labels.MatchEqual {
labelName = matcher.Name
queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name))
} else {
labelName = matcher.Name
queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name), model.LabelValue(matcher.Value))
}
if err != nil {
Expand All @@ -254,7 +266,11 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from,
level.Debug(log).Log("queries", len(queries))

entries, err := c.lookupEntriesByQueries(ctx, queries)
if err != nil {
if e, ok := err.(CardinalityExceededError); ok {
e.MetricName = metricName
e.LabelName = labelName
return nil, e
} else if err != nil {
return nil, err
}
level.Debug(log).Log("entries", len(entries))
Expand Down
1 change: 1 addition & 0 deletions storage/caching_fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ var Fixtures = []testutils.Fixture{
func defaultLimits() (*validation.Overrides, error) {
var defaults validation.Limits
flagext.DefaultValues(&defaults)
defaults.CardinalityLimit = 5
return validation.NewOverrides(defaults)
}
10 changes: 8 additions & 2 deletions storage/caching_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind
batches, misses := s.cacheFetch(ctx, keys)
for _, batch := range batches {
if cardinalityLimit > 0 && batch.Cardinality > cardinalityLimit {
return chunk.ErrCardinalityExceeded
return chunk.CardinalityExceededError{
Size: batch.Cardinality,
Limit: cardinalityLimit,
}
}

queries := queriesByKey[batch.Key]
Expand Down Expand Up @@ -156,7 +159,10 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind
if cardinalityLimit > 0 && cardinality > cardinalityLimit {
batch.Cardinality = cardinality
batch.Entries = nil
cardinalityErr = chunk.ErrCardinalityExceeded
cardinalityErr = chunk.CardinalityExceededError{
Size: cardinality,
Limit: cardinalityLimit,
}
}

keys = append(keys, key)
Expand Down
35 changes: 34 additions & 1 deletion storage/index_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package storage

import (
"fmt"
"strconv"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
)

func TestIndexBasic(t *testing.T) {
Expand Down Expand Up @@ -194,3 +198,32 @@ func TestQueryPages(t *testing.T) {
}
})
}

func TestCardinalityLimit(t *testing.T) {
forAllFixtures(t, func(t *testing.T, client chunk.IndexClient, _ chunk.ObjectClient) {
limits, err := defaultLimits()
require.NoError(t, err)

client = newCachingIndexClient(client, cache.NewMockCache(), time.Minute, limits)
batch := client.NewWriteBatch()
for i := 0; i < 10; i++ {
batch.Add(tableName, "bar", []byte(strconv.Itoa(i)), []byte(strconv.Itoa(i)))
}
err = client.BatchWrite(ctx, batch)
require.NoError(t, err)

var have int
err = client.QueryPages(ctx, []chunk.IndexQuery{{
TableName: tableName,
HashValue: "bar",
}}, func(_ chunk.IndexQuery, read chunk.ReadBatch) bool {
iter := read.Iterator()
for iter.Next() {
have++
}
return true
})
require.Error(t, err, "cardinality limit exceeded for {}; 10 entries, more than limit of 5")
require.Equal(t, 0, have)
})
}

0 comments on commit dca32de

Please sign in to comment.