Skip to content

Commit

Permalink
Move regex opt to after lookup (grafana#2973)
Browse files Browse the repository at this point in the history
* Move regex opt to after lookup

fixes grafana#2906

When we use the caching client (which is what is used in most cases), we
load the entire row (tableName+HashKey) irrespective of what the
rangeKey parameters are. Which means with the optimisation, we are
loading the same row multiple times and then operating on the same data.
This PR moves the optimisation to after the data is loaded which should
be faster.

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Add benchmark

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Add changelog entry

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Address feedback

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

Co-authored-by: Peter Štibraný <peter.stibrany@grafana.com>
  • Loading branch information
gouthamve and pstibrany authored Aug 25, 2020
1 parent 1aa34aa commit a98cfa4
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 191 deletions.
31 changes: 21 additions & 10 deletions chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,16 +460,6 @@ func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, thro
} else if matcher.Type == labels.MatchEqual {
labelName = matcher.Name
queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, matcher.Value)
} else if matcher.Type == labels.MatchRegexp && len(FindSetMatches(matcher.Value)) > 0 {
set := FindSetMatches(matcher.Value)
for _, v := range set {
var qs []IndexQuery
qs, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, v)
if err != nil {
break
}
queries = append(queries, qs...)
}
} else {
labelName = matcher.Name
queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, matcher.Name)
Expand Down Expand Up @@ -550,13 +540,34 @@ func (c *baseStore) parseIndexEntries(_ context.Context, entries []IndexEntry, m
return nil, nil
}

matchSet := map[string]struct{}{}
if matcher != nil && matcher.Type == labels.MatchRegexp {
set := FindSetMatches(matcher.Value)
for _, v := range set {
matchSet[v] = struct{}{}
}
}

result := make([]string, 0, len(entries))
for _, entry := range entries {
chunkKey, labelValue, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value)
if err != nil {
return nil, err
}

// If the matcher is like a set (=~"a|b|c|d|...") and
// the label value is not in that set move on.
if len(matchSet) > 0 {
if _, ok := matchSet[string(labelValue)]; !ok {
continue
}

// If its in the set, then add it to set, we don't need to run
// matcher on it again.
result = append(result, chunkKey)
continue
}

if matcher != nil && !matcher.Matches(string(labelValue)) {
continue
}
Expand Down
205 changes: 24 additions & 181 deletions chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"math/rand"
"reflect"
"sort"
"sync"
"testing"
"time"

Expand All @@ -23,7 +21,6 @@ import (

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down Expand Up @@ -507,6 +504,10 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) {
`foo{bar=~"beep|baz"}`,
[]Chunk{chunk1, chunk2},
},
{
`foo{bar=~"beeping|baz"}`,
[]Chunk{chunk1},
},
{
`foo{toms="code", bar=~"beep|baz"}`,
[]Chunk{chunk1, chunk2},
Expand Down Expand Up @@ -546,177 +547,6 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) {
}
}

// TestChunkStore_verifyRegexSetOptimizations tests if chunks are fetched correctly when we have the metric name
func TestChunkStore_verifyRegexSetOptimizations(t *testing.T) {
ctx := context.Background()
now := model.Now()

testCases := []struct {
query string
expect []string
}{
{
`foo`,
[]string{"foo"},
},
{
`foo{bar="baz"}`,
[]string{"foo{bar=\"baz\"}"},
},
{
`foo{bar!="baz"}`,
[]string{"foo"},
},
{
`foo{toms="code", bar="beep"}`,
[]string{"foo{bar=\"beep\"}", "foo{toms=\"code\"}"},
},
{
`foo{bar=~"beep"}`,
[]string{"foo{bar=\"beep\"}"},
},
{
`foo{bar=~"beep|baz"}`,
[]string{"foo{bar=\"baz\"}", "foo{bar=\"beep\"}"},
},
{
`foo{toms="code", bar=~"beep|baz"}`,
[]string{"foo{bar=\"baz\"}", "foo{bar=\"beep\"}", "foo{toms=\"code\"}"},
},
{
`foo{bar=~".+"}`,
[]string{"foo{bar}"},
},
}

for _, schema := range schemas {
var storeCfg StoreConfig
flagext.DefaultValues(&storeCfg)

schemaCfg := DefaultSchemaConfig("", schema, 0)
schemaObj, err := schemaCfg.Configs[0].CreateSchema()
require.NoError(t, err)

var mockSchema = &mockBaseSchema{schema: schemaObj}

switch s := schemaObj.(type) {
case StoreSchema:
schemaObj = mockStoreSchema{mockBaseSchema: mockSchema, schema: s}
case SeriesStoreSchema:
schemaObj = mockSeriesStoreSchema{mockBaseSchema: mockSchema, schema: s}
}

store := newTestChunkStoreConfigWithMockStorage(t, schemaCfg, schemaObj, storeCfg)
defer store.Stop()

from := now.Add(-time.Hour)
through := now

for _, tc := range testCases {
t.Run(fmt.Sprintf("%s / %s", tc.query, schema), func(t *testing.T) {
// reset queries for test
mockSchema.resetQueries()

t.Log("========= Running query", tc.query, "with schema", schema)
matchers, err := parser.ParseMetricSelector(tc.query)
if err != nil {
t.Fatal(err)
}

_, err = store.Get(ctx, userID, from, through, matchers...)
require.NoError(t, err)

qs := mockSchema.getQueries()
sort.Strings(qs)

if !reflect.DeepEqual(tc.expect, qs) {
t.Fatalf("%s: wrong queries - %s", tc.query, test.Diff(tc.expect, qs))
}
})
}
}
}

type mockBaseSchema struct {
schema BaseSchema

mu sync.Mutex
queries []string
}

func (m *mockBaseSchema) getQueries() []string {
m.mu.Lock()
defer m.mu.Unlock()
return m.queries
}

func (m *mockBaseSchema) resetQueries() {
m.mu.Lock()
defer m.mu.Unlock()
m.queries = nil
}

func (m *mockBaseSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) {
m.mu.Lock()
m.queries = append(m.queries, metricName)
m.mu.Unlock()

return m.schema.GetReadQueriesForMetric(from, through, userID, metricName)
}

func (m *mockBaseSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) {
m.mu.Lock()
m.queries = append(m.queries, fmt.Sprintf("%s{%s}", metricName, labelName))
m.mu.Unlock()

return m.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
}

func (m *mockBaseSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) {
m.mu.Lock()
m.queries = append(m.queries, fmt.Sprintf("%s{%s=%q}", metricName, labelName, labelValue))
m.mu.Unlock()
return m.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
}

func (m *mockBaseSchema) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery {
return m.schema.FilterReadQueries(queries, shard)
}

type mockStoreSchema struct {
*mockBaseSchema
schema StoreSchema
}

func (m mockStoreSchema) GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
return m.schema.GetWriteEntries(from, through, userID, metricName, labels, chunkID)
}

type mockSeriesStoreSchema struct {
*mockBaseSchema
schema SeriesStoreSchema
}

func (m mockSeriesStoreSchema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) {
return m.schema.GetCacheKeysAndLabelWriteEntries(from, through, userID, metricName, labels, chunkID)
}

func (m mockSeriesStoreSchema) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
return m.schema.GetChunkWriteEntries(from, through, userID, metricName, labels, chunkID)
}

func (m mockSeriesStoreSchema) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
return m.schema.GetChunksForSeries(from, through, userID, seriesID)
}

func (m mockSeriesStoreSchema) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
return m.schema.GetLabelNamesForSeries(from, through, userID, seriesID)
}

func (m mockSeriesStoreSchema) GetSeriesDeleteEntries(from, through model.Time, userID string, metric labels.Labels, hasChunksForIntervalFunc hasChunksForIntervalFunc) ([]IndexEntry, error) {
return m.schema.GetSeriesDeleteEntries(from, through, userID, metric, hasChunksForIntervalFunc)
}

func mustNewLabelMatcher(matchType labels.MatchType, name string, value string) *labels.Matcher {
return labels.MustNewMatcher(matchType, name, value)
}
Expand Down Expand Up @@ -1006,13 +836,13 @@ func TestStoreMaxLookBack(t *testing.T) {
require.Equal(t, now, chunks[0].Through)
}

func benchmarkParseIndexEntries(i int64, b *testing.B) {
func benchmarkParseIndexEntries(i int64, regex string, b *testing.B) {
b.ReportAllocs()
b.StopTimer()
store := &store{}
ctx := context.Background()
entries := generateIndexEntries(i)
matcher, err := labels.NewMatcher(labels.MatchRegexp, "", ".*")
matcher, err := labels.NewMatcher(labels.MatchRegexp, "", regex)
if err != nil {
b.Fatal(err)
}
Expand All @@ -1022,16 +852,29 @@ func benchmarkParseIndexEntries(i int64, b *testing.B) {
if err != nil {
b.Fatal(err)
}
if len(keys) != len(entries)/2 {
if regex == ".*" && len(keys) != len(entries)/2 {
b.Fatalf("expected keys:%d got:%d", len(entries)/2, len(keys))
}
}
}

func BenchmarkParseIndexEntries500(b *testing.B) { benchmarkParseIndexEntries(500, b) }
func BenchmarkParseIndexEntries2500(b *testing.B) { benchmarkParseIndexEntries(2500, b) }
func BenchmarkParseIndexEntries10000(b *testing.B) { benchmarkParseIndexEntries(10000, b) }
func BenchmarkParseIndexEntries50000(b *testing.B) { benchmarkParseIndexEntries(50000, b) }
func BenchmarkParseIndexEntries500(b *testing.B) { benchmarkParseIndexEntries(500, ".*", b) }
func BenchmarkParseIndexEntries2500(b *testing.B) { benchmarkParseIndexEntries(2500, ".*", b) }
func BenchmarkParseIndexEntries10000(b *testing.B) { benchmarkParseIndexEntries(10000, ".*", b) }
func BenchmarkParseIndexEntries50000(b *testing.B) { benchmarkParseIndexEntries(50000, ".*", b) }

func BenchmarkParseIndexEntriesRegexSet500(b *testing.B) {
benchmarkParseIndexEntries(500, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b)
}
func BenchmarkParseIndexEntriesRegexSet2500(b *testing.B) {
benchmarkParseIndexEntries(2500, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b)
}
func BenchmarkParseIndexEntriesRegexSet10000(b *testing.B) {
benchmarkParseIndexEntries(10000, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b)
}
func BenchmarkParseIndexEntriesRegexSet50000(b *testing.B) {
benchmarkParseIndexEntries(50000, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b)
}

func generateIndexEntries(n int64) []IndexEntry {
res := make([]IndexEntry, 0, n)
Expand Down

0 comments on commit a98cfa4

Please sign in to comment.