-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Ben Ye <benye@amazon.com>
- Loading branch information
Showing
3 changed files
with
229 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package storecache | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/oklog/ulid" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/prometheus/prometheus/storage" | ||
"golang.org/x/exp/slices" | ||
) | ||
|
||
type FilteredIndexCache struct { | ||
cache IndexCache | ||
enabledItems []string | ||
} | ||
|
||
// NewFilteredIndexCache creates a filtered index cache based on enabled items. | ||
func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredIndexCache { | ||
return &FilteredIndexCache{ | ||
cache: cache, | ||
enabledItems: enabledItems, | ||
} | ||
} | ||
|
||
// StorePostings sets the postings identified by the ulid and label to the value v, | ||
// if the postings already exists in the cache it is not mutated. | ||
func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { | ||
c.cache.StorePostings(blockID, l, v) | ||
} | ||
} | ||
|
||
// FetchMultiPostings fetches multiple postings - each identified by a label - | ||
// and returns a map containing cache hits, along with a list of missing keys. | ||
func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) { | ||
return c.cache.FetchMultiPostings(ctx, blockID, keys) | ||
} | ||
return nil, keys | ||
} | ||
|
||
// StoreExpandedPostings stores expanded postings for a set of label matchers. | ||
func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { | ||
c.cache.StoreExpandedPostings(blockID, matchers, v) | ||
} | ||
} | ||
|
||
// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. | ||
func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) { | ||
return c.cache.FetchExpandedPostings(ctx, blockID, matchers) | ||
} | ||
return nil, false | ||
} | ||
|
||
// StoreSeries sets the series identified by the ulid and id to the value v, | ||
// if the series already exists in the cache it is not mutated. | ||
func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { | ||
c.cache.StoreSeries(blockID, id, v) | ||
} | ||
} | ||
|
||
// FetchMultiSeries fetches multiple series - each identified by ID - from the cache | ||
// and returns a map containing cache hits, along with a list of missing IDs. | ||
func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { | ||
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) { | ||
return c.cache.FetchMultiSeries(ctx, blockID, ids) | ||
} | ||
return nil, ids | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package storecache | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/efficientgo/core/testutil" | ||
"github.com/go-kit/log" | ||
"github.com/oklog/ulid" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/prometheus/prometheus/storage" | ||
) | ||
|
||
func TestFilterCache(t *testing.T) { | ||
blockID := ulid.MustNew(ulid.Now(), nil) | ||
postingKeys := []labels.Label{ | ||
{Name: "foo", Value: "bar"}, | ||
} | ||
expandedPostingsMatchers := []*labels.Matcher{ | ||
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), | ||
} | ||
testPostingData := []byte("postings") | ||
testExpandedPostingsData := []byte("expandedPostings") | ||
testSeriesData := []byte("series") | ||
ctx := context.TODO() | ||
for _, tc := range []struct { | ||
name string | ||
enabledItems []string | ||
verifyFunc func(t *testing.T, c IndexCache) | ||
}{ | ||
{ | ||
name: "empty enabled items", | ||
verifyFunc: func(t *testing.T, c IndexCache) { | ||
c.StorePostings(blockID, postingKeys[0], testPostingData) | ||
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) | ||
c.StoreSeries(blockID, 1, testSeriesData) | ||
|
||
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) | ||
testutil.Equals(t, 0, len(missed)) | ||
testutil.Equals(t, testPostingData, hits[postingKeys[0]]) | ||
|
||
ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) | ||
testutil.Equals(t, true, hit) | ||
testutil.Equals(t, testExpandedPostingsData, ep) | ||
|
||
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) | ||
testutil.Equals(t, 0, len(misses)) | ||
testutil.Equals(t, testSeriesData, seriesHit[1]) | ||
}, | ||
}, | ||
{ | ||
name: "all enabled items", | ||
enabledItems: []string{cacheTypeSeries, cacheTypePostings, cacheTypeExpandedPostings}, | ||
verifyFunc: func(t *testing.T, c IndexCache) { | ||
c.StorePostings(blockID, postingKeys[0], testPostingData) | ||
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) | ||
c.StoreSeries(blockID, 1, testSeriesData) | ||
|
||
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) | ||
testutil.Equals(t, 0, len(missed)) | ||
testutil.Equals(t, testPostingData, hits[postingKeys[0]]) | ||
|
||
ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) | ||
testutil.Assert(t, true, hit) | ||
testutil.Equals(t, testExpandedPostingsData, ep) | ||
|
||
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) | ||
testutil.Equals(t, 0, len(misses)) | ||
testutil.Equals(t, testSeriesData, seriesHit[1]) | ||
}, | ||
}, | ||
{ | ||
name: "only enable postings", | ||
enabledItems: []string{cacheTypePostings}, | ||
verifyFunc: func(t *testing.T, c IndexCache) { | ||
c.StorePostings(blockID, postingKeys[0], testPostingData) | ||
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) | ||
c.StoreSeries(blockID, 1, testSeriesData) | ||
|
||
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) | ||
testutil.Equals(t, 0, len(missed)) | ||
testutil.Equals(t, testPostingData, hits[postingKeys[0]]) | ||
|
||
_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) | ||
testutil.Equals(t, false, hit) | ||
|
||
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) | ||
testutil.Equals(t, 1, len(misses)) | ||
testutil.Equals(t, 0, len(seriesHit)) | ||
}, | ||
}, | ||
{ | ||
name: "only enable expanded postings", | ||
enabledItems: []string{cacheTypeExpandedPostings}, | ||
verifyFunc: func(t *testing.T, c IndexCache) { | ||
c.StorePostings(blockID, postingKeys[0], testPostingData) | ||
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) | ||
c.StoreSeries(blockID, 1, testSeriesData) | ||
|
||
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) | ||
testutil.Equals(t, 1, len(missed)) | ||
testutil.Equals(t, 0, len(hits)) | ||
|
||
ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) | ||
testutil.Equals(t, true, hit) | ||
testutil.Equals(t, testExpandedPostingsData, ep) | ||
|
||
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) | ||
testutil.Equals(t, 1, len(misses)) | ||
testutil.Equals(t, 0, len(seriesHit)) | ||
}, | ||
}, | ||
{ | ||
name: "only enable series", | ||
enabledItems: []string{cacheTypeSeries}, | ||
verifyFunc: func(t *testing.T, c IndexCache) { | ||
c.StorePostings(blockID, postingKeys[0], testPostingData) | ||
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData) | ||
c.StoreSeries(blockID, 1, testSeriesData) | ||
|
||
hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys) | ||
testutil.Equals(t, 1, len(missed)) | ||
testutil.Equals(t, 0, len(hits)) | ||
|
||
_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers) | ||
testutil.Equals(t, false, hit) | ||
|
||
seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}) | ||
testutil.Equals(t, 0, len(misses)) | ||
testutil.Equals(t, testSeriesData, seriesHit[1]) | ||
}, | ||
}, | ||
} { | ||
t.Run(tc.name, func(t *testing.T) { | ||
inMemoryCache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, prometheus.NewRegistry(), DefaultInMemoryIndexCacheConfig) | ||
testutil.Ok(t, err) | ||
c := NewFilteredIndexCache(inMemoryCache, tc.enabledItems) | ||
tc.verifyFunc(t, c) | ||
}) | ||
} | ||
} |