Skip to content

Commit

Permalink
check context cancel in inmemory cache (#6788)
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored Oct 10, 2023
1 parent 6aeca7e commit b2b80b3
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,17 @@ func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypePostings, tenant))
defer timer.ObserveDuration()

hits = map[labels.Label][]byte{}

blockIDKey := blockID.String()
for _, key := range keys {
if ctx.Err() != nil {
return hits, misses
}
if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key), ""}, tenant); ok {
hits[key] = b
continue
Expand All @@ -328,10 +331,13 @@ func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant))
defer timer.ObserveDuration()

if ctx.Err() != nil {
return nil, false
}
if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers)), ""}, tenant); ok {
return b, true
}
Expand All @@ -347,14 +353,17 @@ func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
timer := prometheus.NewTimer(c.commonMetrics.fetchLatency.WithLabelValues(cacheTypeSeries, tenant))
defer timer.ObserveDuration()

hits = map[storage.SeriesRef][]byte{}

blockIDKey := blockID.String()
for _, id := range ids {
if ctx.Err() != nil {
return hits, misses
}
if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id), ""}, tenant); ok {
hits[id] = b
continue
Expand Down

0 comments on commit b2b80b3

Please sign in to comment.