Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/cache: drop updates to existing items and dedupe get/set methods #1142

Merged
merged 1 commit into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 50 additions & 72 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,48 @@ func (c *IndexCache) onEvict(key, val interface{}) {
c.curSize -= entrySize
}

func (c *IndexCache) get(typ string, key cacheKey) ([]byte, bool) {
c.requests.WithLabelValues(typ).Inc()

c.mtx.Lock()
defer c.mtx.Unlock()

v, ok := c.lru.Get(key)
if !ok {
return nil, false
}
c.hits.WithLabelValues(typ).Inc()
return v.([]byte), true
}

func (c *IndexCache) set(typ string, key cacheKey, val []byte) {
var size = sliceHeaderSize + uint64(len(val))

c.mtx.Lock()
defer c.mtx.Unlock()

if _, ok := c.lru.Get(key); ok {
return
}

if !c.ensureFits(size, typ) {
c.overflow.WithLabelValues(typ).Inc()
return
}

// The caller may be passing in a sub-slice of a huge array. Copy the data
// to ensure we don't waste huge amounts of space for something small.
v := make([]byte, len(val))
copy(v, val)
c.lru.Add(key, v)

c.added.WithLabelValues(typ).Inc()
c.currentSize.WithLabelValues(typ).Add(float64(size))
c.totalCurrentSize.WithLabelValues(typ).Add(float64(size + key.size()))
c.current.WithLabelValues(typ).Inc()
c.curSize += size
}

// ensureFits tries to make sure that the passed slice will fit into the LRU cache.
// Returns true if it will fit.
func (c *IndexCache) ensureFits(size uint64, typ string) bool {
Expand Down Expand Up @@ -237,86 +279,22 @@ func (c *IndexCache) ensureFits(size uint64, typ string) bool {
return true
}

// SetPostings sets the postings identfied by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *IndexCache) SetPostings(b ulid.ULID, l labels.Label, v []byte) {
var (
entrySize = sliceHeaderSize + uint64(len(v))
cacheType = cacheTypePostings
)

c.mtx.Lock()
defer c.mtx.Unlock()

if !c.ensureFits(entrySize, cacheType) {
c.overflow.WithLabelValues(cacheType).Inc()
return
}

// The caller may be passing in a sub-slice of a huge array. Copy the data
// to ensure we don't waste huge amounts of space for something small.
cv := make([]byte, len(v))
copy(cv, v)
key := cacheKey{b, cacheKeyPostings(l)}
c.lru.Add(key, cv)

c.added.WithLabelValues(cacheType).Inc()
c.currentSize.WithLabelValues(cacheType).Add(float64(entrySize))
c.totalCurrentSize.WithLabelValues(cacheType).Add(float64(entrySize + key.size()))
c.current.WithLabelValues(cacheType).Inc()
c.curSize += entrySize
c.set(cacheTypePostings, cacheKey{b, cacheKeyPostings(l)}, v)
}

func (c *IndexCache) Postings(b ulid.ULID, l labels.Label) ([]byte, bool) {
c.requests.WithLabelValues(cacheTypePostings).Inc()

c.mtx.Lock()
defer c.mtx.Unlock()

v, ok := c.lru.Get(cacheKey{b, cacheKeyPostings(l)})
if !ok {
return nil, false
}
c.hits.WithLabelValues(cacheTypePostings).Inc()
return v.([]byte), true
return c.get(cacheTypePostings, cacheKey{b, cacheKeyPostings(l)})
}

// SetSeries sets the series identfied by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *IndexCache) SetSeries(b ulid.ULID, id uint64, v []byte) {
var (
entrySize = 16 + uint64(len(v)) // Slice header + bytes.
cacheType = cacheTypeSeries
)

c.mtx.Lock()
defer c.mtx.Unlock()

if !c.ensureFits(entrySize, cacheType) {
c.overflow.WithLabelValues(cacheType).Inc()
return
}

// The caller may be passing in a sub-slice of a huge array. Copy the data
// to ensure we don't waste huge amounts of space for something small.
cv := make([]byte, len(v))
copy(cv, v)
key := cacheKey{b, cacheKeySeries(id)}
c.lru.Add(key, cv)

c.added.WithLabelValues(cacheType).Inc()
c.currentSize.WithLabelValues(cacheType).Add(float64(entrySize))
c.totalCurrentSize.WithLabelValues(cacheType).Add(float64(entrySize + key.size()))
c.current.WithLabelValues(cacheType).Inc()
c.curSize += entrySize
c.set(cacheTypeSeries, cacheKey{b, cacheKeySeries(id)}, v)
}

func (c *IndexCache) Series(b ulid.ULID, id uint64) ([]byte, bool) {
c.requests.WithLabelValues(cacheTypeSeries).Inc()

c.mtx.Lock()
defer c.mtx.Unlock()

v, ok := c.lru.Get(cacheKey{b, cacheKeySeries(id)})
if !ok {
return nil, false
}
c.hits.WithLabelValues(cacheTypeSeries).Inc()
return v.([]byte), true
return c.get(cacheTypeSeries, cacheKey{b, cacheKeySeries(id)})
}
109 changes: 101 additions & 8 deletions pkg/store/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
package storecache

import (
"bytes"
"fmt"
"math"
"testing"
"time"
Expand Down Expand Up @@ -48,6 +50,99 @@ func TestIndexCache_AvoidsDeadlock(t *testing.T) {
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries)))
}

func TestIndexCache_UpdateItem(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

const maxSize = 2 * (sliceHeaderSize + 1)

var errorLogs []string
errorLogger := log.LoggerFunc(func(kvs ...interface{}) error {
var lvl string
for i := 0; i < len(kvs); i += 2 {
if kvs[i] == "level" {
lvl = fmt.Sprint(kvs[i+1])
break
}
}
if lvl != "error" {
return nil
}
var buf bytes.Buffer
defer func() { errorLogs = append(errorLogs, buf.String()) }()
return log.NewLogfmtLogger(&buf).Log(kvs...)
})

metrics := prometheus.NewRegistry()
cache, err := NewIndexCache(log.NewSyncLogger(errorLogger), metrics, Opts{
MaxItemSizeBytes: maxSize,
MaxSizeBytes: maxSize,
})
testutil.Ok(t, err)

uid := func(id uint64) ulid.ULID { return ulid.MustNew(id, nil) }
lbl := labels.Label{Name: "foo", Value: "bar"}

for _, tt := range []struct {
typ string
set func(uint64, []byte)
get func(uint64) ([]byte, bool)
}{
{
typ: cacheTypePostings,
set: func(id uint64, b []byte) { cache.SetPostings(uid(id), lbl, b) },
get: func(id uint64) ([]byte, bool) { return cache.Postings(uid(id), lbl) },
},
{
typ: cacheTypeSeries,
set: func(id uint64, b []byte) { cache.SetSeries(uid(id), id, b) },
get: func(id uint64) ([]byte, bool) { return cache.Series(uid(id), id) },
},
} {
t.Run(tt.typ, func(t *testing.T) {
defer func() { errorLogs = nil }()

// Set value.
tt.set(0, []byte{0})
buf, ok := tt.get(0)
testutil.Equals(t, true, ok)
testutil.Equals(t, []byte{0}, buf)
testutil.Equals(t, float64(sliceHeaderSize+1), promtest.ToFloat64(cache.currentSize.WithLabelValues(tt.typ)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(tt.typ)))
testutil.Equals(t, []string(nil), errorLogs)

// Set the same value again.
// NB: This used to over-count the value.
tt.set(0, []byte{0})
buf, ok = tt.get(0)
testutil.Equals(t, true, ok)
testutil.Equals(t, []byte{0}, buf)
testutil.Equals(t, float64(sliceHeaderSize+1), promtest.ToFloat64(cache.currentSize.WithLabelValues(tt.typ)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(tt.typ)))
testutil.Equals(t, []string(nil), errorLogs)

// Set a larger value.
// NB: This used to deadlock when enough values were over-counted and it
// couldn't clear enough space -- repeatedly removing oldest after empty.
tt.set(1, []byte{0, 1})
buf, ok = tt.get(1)
testutil.Equals(t, true, ok)
testutil.Equals(t, []byte{0, 1}, buf)
testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(tt.typ)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(tt.typ)))
testutil.Equals(t, []string(nil), errorLogs)

// Mutations to existing values will be ignored.
tt.set(1, []byte{1, 2})
buf, ok = tt.get(1)
testutil.Equals(t, true, ok)
testutil.Equals(t, []byte{0, 1}, buf)
testutil.Equals(t, float64(sliceHeaderSize+2), promtest.ToFloat64(cache.currentSize.WithLabelValues(tt.typ)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.current.WithLabelValues(tt.typ)))
testutil.Equals(t, []string(nil), errorLogs)
})
}
}

// This should not happen as we hardcode math.MaxInt, but we still add test to check this out.
func TestIndexCache_MaxNumberOfItemsHit(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()
Expand Down Expand Up @@ -171,8 +266,6 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) {
testutil.Equals(t, v, p)

// Add same item again.
// NOTE: In our caller code, we always check first hit, then we claim miss and set posting so this should not happen.
// That's why this case is not optimized and we evict + re add the item.
cache.SetPostings(id, lbls2, v)

testutil.Equals(t, uint64(2*sliceHeaderSize+5), cache.curSize)
Expand All @@ -184,7 +277,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) {
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.totalCurrentSize.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings))) // Eviction.
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries)))

p, ok = cache.Postings(id, lbls2)
Expand All @@ -202,7 +295,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) {
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.totalCurrentSize.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings))) // Overflow.
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries)))

_, _, ok = cache.lru.RemoveOldest()
Expand All @@ -217,7 +310,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) {
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.totalCurrentSize.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, float64(3), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries)))

_, _, ok = cache.lru.RemoveOldest()
Expand All @@ -236,7 +329,7 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) {
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.totalCurrentSize.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, float64(3), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries)))

p, ok = cache.Postings(id, lbls3)
Expand All @@ -256,15 +349,15 @@ func TestIndexCache_Eviction_WithMetrics(t *testing.T) {
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.totalCurrentSize.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(0), promtest.ToFloat64(cache.overflow.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, float64(3), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(2), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.evicted.WithLabelValues(cacheTypeSeries)))

p, ok = cache.Postings(id, lbls4)
testutil.Assert(t, ok, "key exists")
testutil.Equals(t, []byte{}, p)

// Other metrics.
testutil.Equals(t, float64(5), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(4), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(1), promtest.ToFloat64(cache.added.WithLabelValues(cacheTypeSeries)))
testutil.Equals(t, float64(9), promtest.ToFloat64(cache.requests.WithLabelValues(cacheTypePostings)))
testutil.Equals(t, float64(2), promtest.ToFloat64(cache.requests.WithLabelValues(cacheTypeSeries)))
Expand Down