diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index cd75332b87..740354c9c2 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -190,6 +190,48 @@ func (c *IndexCache) onEvict(key, val interface{}) { c.curSize -= entrySize } +func (c *IndexCache) get(typ string, key cacheKey) ([]byte, bool) { + c.requests.WithLabelValues(typ).Inc() + + c.mtx.Lock() + defer c.mtx.Unlock() + + v, ok := c.lru.Get(key) + if !ok { + return nil, false + } + c.hits.WithLabelValues(typ).Inc() + return v.([]byte), true +} + +func (c *IndexCache) set(typ string, key cacheKey, val []byte) { + var size = sliceHeaderSize + uint64(len(val)) + + c.mtx.Lock() + defer c.mtx.Unlock() + + if _, ok := c.lru.Get(key); ok { + return + } + + if !c.ensureFits(size, typ) { + c.overflow.WithLabelValues(typ).Inc() + return + } + + // The caller may be passing in a sub-slice of a huge array. Copy the data + // to ensure we don't waste huge amounts of space for something small. + v := make([]byte, len(val)) + copy(v, val) + c.lru.Add(key, v) + + c.added.WithLabelValues(typ).Inc() + c.currentSize.WithLabelValues(typ).Add(float64(size)) + c.totalCurrentSize.WithLabelValues(typ).Add(float64(size + key.size())) + c.current.WithLabelValues(typ).Inc() + c.curSize += size +} + // ensureFits tries to make sure that the passed slice will fit into the LRU cache. // Returns true if it will fit. func (c *IndexCache) ensureFits(size uint64, typ string) bool { @@ -237,86 +279,22 @@ func (c *IndexCache) ensureFits(size uint64, typ string) bool { return true } +// SetPostings sets the postings identfied by the ulid and label to the value v, +// if the postings already exists in the cache it is not mutated. func (c *IndexCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) { - var ( - entrySize = sliceHeaderSize + uint64(len(v)) - cacheType = cacheTypePostings - ) - - c.mtx.Lock() - defer c.mtx.Unlock() - - if !c.ensureFits(entrySize, cacheType) { - c.overflow.WithLabelValues(cacheType).Inc() - return - } - - // The caller may be passing in a sub-slice of a huge array. Copy the data - // to ensure we don't waste huge amounts of space for something small. - cv := make([]byte, len(v)) - copy(cv, v) - key := cacheKey{b, cacheKeyPostings(l)} - c.lru.Add(key, cv) - - c.added.WithLabelValues(cacheType).Inc() - c.currentSize.WithLabelValues(cacheType).Add(float64(entrySize)) - c.totalCurrentSize.WithLabelValues(cacheType).Add(float64(entrySize + key.size())) - c.current.WithLabelValues(cacheType).Inc() - c.curSize += entrySize + c.set(cacheTypePostings, cacheKey{b, cacheKeyPostings(l)}, v) } func (c *IndexCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) { - c.requests.WithLabelValues(cacheTypePostings).Inc() - - c.mtx.Lock() - defer c.mtx.Unlock() - - v, ok := c.lru.Get(cacheKey{b, cacheKeyPostings(l)}) - if !ok { - return nil, false - } - c.hits.WithLabelValues(cacheTypePostings).Inc() - return v.([]byte), true + return c.get(cacheTypePostings, cacheKey{b, cacheKeyPostings(l)}) } +// SetSeries sets the series identfied by the ulid and id to the value v, +// if the series already exists in the cache it is not mutated. func (c *IndexCache) SetSeries(b ulid.ULID, id uint64, v []byte) { - var ( - entrySize = 16 + uint64(len(v)) // Slice header + bytes. - cacheType = cacheTypeSeries - ) - - c.mtx.Lock() - defer c.mtx.Unlock() - - if !c.ensureFits(entrySize, cacheType) { - c.overflow.WithLabelValues(cacheType).Inc() - return - } - - // The caller may be passing in a sub-slice of a huge array. Copy the data - // to ensure we don't waste huge amounts of space for something small. - cv := make([]byte, len(v)) - copy(cv, v) - key := cacheKey{b, cacheKeySeries(id)} - c.lru.Add(key, cv) - - c.added.WithLabelValues(cacheType).Inc() - c.currentSize.WithLabelValues(cacheType).Add(float64(entrySize)) - c.totalCurrentSize.WithLabelValues(cacheType).Add(float64(entrySize + key.size())) - c.current.WithLabelValues(cacheType).Inc() - c.curSize += entrySize + c.set(cacheTypeSeries, cacheKey{b, cacheKeySeries(id)}, v) } func (c *IndexCache) Series(b ulid.ULID, id uint64) ([]byte, bool) { - c.requests.WithLabelValues(cacheTypeSeries).Inc() - - c.mtx.Lock() - defer c.mtx.Unlock() - - v, ok := c.lru.Get(cacheKey{b, cacheKeySeries(id)}) - if !ok { - return nil, false - } - c.hits.WithLabelValues(cacheTypeSeries).Inc() - return v.([]byte), true + return c.get(cacheTypeSeries, cacheKey{b, cacheKeySeries(id)}) } diff --git a/pkg/store/cache/cache_test.go b/pkg/store/cache/cache_test.go index e5fe113319..997c3cc7c2 100644 --- a/pkg/store/cache/cache_test.go +++ b/pkg/store/cache/cache_test.go @@ -2,6 +2,8 @@ package storecache import ( + "bytes" + "fmt" "math" "testing" "time" @@ -48,6 +50,99 @@ func TestIndexCache_AvoidsDeadlock(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) } +func TestIndexCache_UpdateItem(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + const maxSize = 2 * (sliceHeaderSize + 1) + + var errorLogs []string + errorLogger := log.LoggerFunc(func(kvs ...interface{}) error { + var lvl string + for i := 0; i < len(kvs); i += 2 { + if kvs[i] == "level" { + lvl = fmt.Sprint(kvs[i+1]) + break + } + } + if lvl != "error" { + return nil + } + var buf bytes.Buffer + defer func() { errorLogs = append(errorLogs, buf.String()) }() + return log.NewLogfmtLogger(&buf).Log(kvs...) + }) + + metrics := prometheus.NewRegistry() + cache, err := NewIndexCache(log.NewSyncLogger(errorLogger), metrics, Opts{ + MaxItemSizeBytes: maxSize, + MaxSizeBytes: maxSize, + }) + testutil.Ok(t, err) + + uid := func(id uint64) ulid.ULID { return ulid.MustNew(id, nil) } + lbl := labels.Label{Name: "foo", Value: "bar"} + + for _, tt := range []struct { + typ string + set func(uint64, []byte) + get func(uint64) ([]byte, bool) + }{ + { + typ: cacheTypePostings, + set: func(id uint64, b []byte) { cache.SetPostings(uid(id), lbl, b) }, + get: func(id uint64) ([]byte, bool) { return cache.Postings(uid(id), lbl) }, + }, + { + typ: cacheTypeSeries, + set: func(id uint64, b []byte) { cache.SetSeries(uid(id), id, b) }, + get: func(id uint64) ([]byte, bool) { return cache.Series(uid(id), id) }, + }, + } { + t.Run(tt.typ, func(t *testing.T) { + defer func() { errorLogs = nil }() + + // Set value. + tt.set(0, []byte{0}) + buf, ok := tt.get(0) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0}, buf) + testutil.Equals(t, float64(sliceHeaderSize+1), promtest.ToFloat64(cache.currentSize.WithLabelValues(tt.typ))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(tt.typ))) + testutil.Equals(t, []string(nil), errorLogs) + + // Set the same value again. + // NB: This used to over-count the value. + tt.set(0, []byte{0}) + buf, ok = tt.get(0) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0}, buf) + testutil.Equals(t, float64(sliceHeaderSize+1), promtest.ToFloat64(cache.currentSize.WithLabelValues(tt.typ))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(tt.typ))) + testutil.Equals(t, []string(nil), errorLogs) + + // Set a larger value. + // NB: This used to deadlock when enough values were over-counted and it + // couldn't clear enough space -- repeatedly removing oldest after empty. + tt.set(1, []byte{0, 1}) + buf, ok = tt.get(1) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0, 1}, buf) + testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(tt.typ))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(tt.typ))) + testutil.Equals(t, []string(nil), errorLogs) + + // Mutations to existing values will be ignored. + tt.set(1, []byte{1, 2}) + buf, ok = tt.get(1) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0, 1}, buf) + testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(tt.typ))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(tt.typ))) + testutil.Equals(t, []string(nil), errorLogs) + }) + } +} + // This should not happen as we hardcode math.MaxInt, but we still add test to check this out. func TestIndexCache_MaxNumberOfItemsHit(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() @@ -171,8 +266,6 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, v, p) // Add same item again. - // NOTE: In our caller code, we always check first hit, then we claim miss and set posting so this should not happen. - // That's why this case is not optimized and we evict + re add the item. cache.SetPostings(id, lbls2, v) testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize) @@ -184,7 +277,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.totalCurrentSize.WithLabelValues(cacheTypeSeries))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) - testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) // Eviction. + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) p, ok = cache.Postings(id, lbls2) @@ -202,7 +295,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.totalCurrentSize.WithLabelValues(cacheTypeSeries))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) // Overflow. testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) - testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) _, _, ok = cache.lru.RemoveOldest() @@ -217,7 +310,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.totalCurrentSize.WithLabelValues(cacheTypeSeries))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) - testutil.Equals(t, float64(3), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) _, _, ok = cache.lru.RemoveOldest() @@ -236,7 +329,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.totalCurrentSize.WithLabelValues(cacheTypeSeries))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) - testutil.Equals(t, float64(3), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) p, ok = cache.Postings(id, lbls3) @@ -256,7 +349,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, float64(0), promtest.ToFloat64(cache.totalCurrentSize.WithLabelValues(cacheTypeSeries))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries))) - testutil.Equals(t, float64(3), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries))) p, ok = cache.Postings(id, lbls4) @@ -264,7 +357,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) { testutil.Equals(t, []byte{}, p) // Other metrics. - testutil.Equals(t, float64(5), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypePostings))) + testutil.Equals(t, float64(4), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(1), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypeSeries))) testutil.Equals(t, float64(9), promtest.ToFloat64(cache.requests.WithLabelValues(cacheTypePostings))) testutil.Equals(t, float64(2), promtest.ToFloat64(cache.requests.WithLabelValues(cacheTypeSeries)))