forked from thanos-io/thanos
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support filtered index cache (thanos-io#6765)
* support filtered index cache Signed-off-by: Ben Ye <benye@amazon.com> * changelog Signed-off-by: Ben Ye <benye@amazon.com> * fix doc Signed-off-by: Ben Ye <benye@amazon.com> * fix unit test failure Signed-off-by: Ben Ye <benye@amazon.com> * add item type validation Signed-off-by: Ben Ye <benye@amazon.com> * lint Signed-off-by: Ben Ye <benye@amazon.com> * change enabled_items to []string type Signed-off-by: Ben Ye <benye@amazon.com> * generate docs Signed-off-by: Ben Ye <benye@amazon.com> * separate validation code Signed-off-by: Ben Ye <benye@amazon.com> * fix lint Signed-off-by: Ben Ye <benye@amazon.com> * update doc Signed-off-by: Ben Ye <benye@amazon.com> * fix interface Signed-off-by: Ben Ye <benye@amazon.com> --------- Signed-off-by: Ben Ye <benye@amazon.com>
- Loading branch information
1 parent
06d4407
commit 3e05544
Showing
5 changed files
with
271 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package storecache | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/oklog/ulid" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/prometheus/prometheus/storage" | ||
"golang.org/x/exp/slices" | ||
) | ||
|
||
type FilteredIndexCache struct { | ||
cache IndexCache | ||
enabledItems []string | ||
} | ||
|
||
// NewFilteredIndexCache creates a filtered index cache based on enabled items. | ||
func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredIndexCache { | ||
return &FilteredIndexCache{ | ||
cache: cache, | ||
enabledItems: enabledItems, | ||
} | ||
} | ||
|
||
// StorePostings sets the postings identified by the ulid and label to the value v, | ||
// if the postings already exists in the cache it is not mutated. | ||
func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { | ||
c.cache.StorePostings(blockID, l, v, tenant) | ||
} | ||
} | ||
|
||
// FetchMultiPostings fetches multiple postings - each identified by a label - | ||
// and returns a map containing cache hits, along with a list of missing keys. | ||
func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { | ||
return c.cache.FetchMultiPostings(ctx, blockID, keys, tenant) | ||
} | ||
return nil, keys | ||
} | ||
|
||
// StoreExpandedPostings stores expanded postings for a set of label matchers. | ||
func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { | ||
c.cache.StoreExpandedPostings(blockID, matchers, v, tenant) | ||
} | ||
} | ||
|
||
// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. | ||
func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { | ||
return c.cache.FetchExpandedPostings(ctx, blockID, matchers, tenant) | ||
} | ||
return nil, false | ||
} | ||
|
||
// StoreSeries sets the series identified by the ulid and id to the value v, | ||
// if the series already exists in the cache it is not mutated. | ||
func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { | ||
c.cache.StoreSeries(blockID, id, v, tenant) | ||
} | ||
} | ||
|
||
// FetchMultiSeries fetches multiple series - each identified by ID - from the cache | ||
// and returns a map containing cache hits, along with a list of missing IDs. | ||
func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { | ||
return c.cache.FetchMultiSeries(ctx, blockID, ids, tenant) | ||
} | ||
return nil, ids | ||
} | ||
|
||
func ValidateEnabledItems(enabledItems []string) error { | ||
for _, item := range enabledItems { | ||
switch item { | ||
// valid | ||
case cacheTypePostings, cacheTypeExpandedPostings, cacheTypeSeries: | ||
default: | ||
return fmt.Errorf("unsupported item type %s", item) | ||
} | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package storecache | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/efficientgo/core/testutil" | ||
"github.com/go-kit/log" | ||
"github.com/oklog/ulid" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/prometheus/prometheus/storage" | ||
|
||
"github.com/thanos-io/thanos/pkg/tenancy" | ||
) | ||
|
||
func TestFilterCache(t *testing.T) { | ||
blockID := ulid.MustNew(ulid.Now(), nil) | ||
postingKeys := []labels.Label{ | ||
{Name: "foo", Value: "bar"}, | ||
} | ||
expandedPostingsMatchers := []*labels.Matcher{ | ||
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), | ||
} | ||
testPostingData := []byte("postings") | ||
testExpandedPostingsData := []byte("expandedPostings") | ||
testSeriesData := []byte("series") | ||
ctx := context.TODO() | ||
for _, tc := range []struct { | ||
name string | ||
enabledItems []string | ||
expectedError string | ||
verifyFunc func(t *testing.T, c IndexCache) | ||
}{ | ||
{ | ||
name: "invalid item type", | ||
expectedError: "unsupported item type foo", | ||
enabledItems: []string{"foo"}, | ||
}, | ||
{ | ||
name: "invalid item type with 1 valid cache type", | ||
expectedError: "unsupported item type foo", | ||
enabledItems: []string{cacheTypeExpandedPostings, "foo"}, | ||
}, | ||
{ | ||
name: "empty enabled items", | ||
verifyFunc: func(t *testing.T, c IndexCache) { | ||
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) | ||
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) | ||
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) | ||
|
||
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) | ||
testutil.Equals(t, 0, len(missed)) | ||
testutil.Equals(t, testPostingData, hits[postingKeys[0]]) | ||
|
||
ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) | ||
testutil.Equals(t, true, hit) | ||
testutil.Equals(t, testExpandedPostingsData, ep) | ||
|
||
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) | ||
testutil.Equals(t, 0, len(misses)) | ||
testutil.Equals(t, testSeriesData, seriesHit[1]) | ||
}, | ||
}, | ||
{ | ||
name: "all enabled items", | ||
enabledItems: []string{cacheTypeSeries, cacheTypePostings, cacheTypeExpandedPostings}, | ||
verifyFunc: func(t *testing.T, c IndexCache) { | ||
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) | ||
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) | ||
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) | ||
|
||
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) | ||
testutil.Equals(t, 0, len(missed)) | ||
testutil.Equals(t, testPostingData, hits[postingKeys[0]]) | ||
|
||
ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) | ||
testutil.Assert(t, true, hit) | ||
testutil.Equals(t, testExpandedPostingsData, ep) | ||
|
||
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) | ||
testutil.Equals(t, 0, len(misses)) | ||
testutil.Equals(t, testSeriesData, seriesHit[1]) | ||
}, | ||
}, | ||
{ | ||
name: "only enable postings", | ||
enabledItems: []string{cacheTypePostings}, | ||
verifyFunc: func(t *testing.T, c IndexCache) { | ||
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) | ||
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) | ||
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) | ||
|
||
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) | ||
testutil.Equals(t, 0, len(missed)) | ||
testutil.Equals(t, testPostingData, hits[postingKeys[0]]) | ||
|
||
_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) | ||
testutil.Equals(t, false, hit) | ||
|
||
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) | ||
testutil.Equals(t, 1, len(misses)) | ||
testutil.Equals(t, 0, len(seriesHit)) | ||
}, | ||
}, | ||
{ | ||
name: "only enable expanded postings", | ||
enabledItems: []string{cacheTypeExpandedPostings}, | ||
verifyFunc: func(t *testing.T, c IndexCache) { | ||
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) | ||
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) | ||
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) | ||
|
||
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) | ||
testutil.Equals(t, 1, len(missed)) | ||
testutil.Equals(t, 0, len(hits)) | ||
|
||
ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) | ||
testutil.Equals(t, true, hit) | ||
testutil.Equals(t, testExpandedPostingsData, ep) | ||
|
||
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) | ||
testutil.Equals(t, 1, len(misses)) | ||
testutil.Equals(t, 0, len(seriesHit)) | ||
}, | ||
}, | ||
{ | ||
name: "only enable series", | ||
enabledItems: []string{cacheTypeSeries}, | ||
verifyFunc: func(t *testing.T, c IndexCache) { | ||
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant) | ||
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant) | ||
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant) | ||
|
||
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant) | ||
testutil.Equals(t, 1, len(missed)) | ||
testutil.Equals(t, 0, len(hits)) | ||
|
||
_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant) | ||
testutil.Equals(t, false, hit) | ||
|
||
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant) | ||
testutil.Equals(t, 0, len(misses)) | ||
testutil.Equals(t, testSeriesData, seriesHit[1]) | ||
}, | ||
}, | ||
} { | ||
t.Run(tc.name, func(t *testing.T) { | ||
inMemoryCache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, prometheus.NewRegistry(), DefaultInMemoryIndexCacheConfig) | ||
testutil.Ok(t, err) | ||
err = ValidateEnabledItems(tc.enabledItems) | ||
if tc.expectedError != "" { | ||
testutil.Equals(t, tc.expectedError, err.Error()) | ||
} else { | ||
testutil.Ok(t, err) | ||
c := NewFilteredIndexCache(inMemoryCache, tc.enabledItems) | ||
tc.verifyFunc(t, c) | ||
} | ||
}) | ||
} | ||
} |