Skip to content

Commit

Permalink
Cache: various index cache client improvements (thanos-io#6374)
Browse files Browse the repository at this point in the history
  • Loading branch information
yeya24 authored and HC Zhu committed Jun 27, 2023
1 parent 1bb12b7 commit bacfc5a
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 49 deletions.
2 changes: 1 addition & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2345,7 +2345,7 @@ type postingPtr struct {
}

// fetchPostings fill postings requested by posting groups.
// It returns one postings for each key, in the same order.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, []func(), error) {
var closeFns []func()
Expand Down
6 changes: 3 additions & 3 deletions pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type IndexCache interface {
}

type cacheKey struct {
block ulid.ULID
block string
key interface{}
}

Expand Down Expand Up @@ -79,9 +79,9 @@ func (c cacheKey) string() string {
// which would end up in wrong query results.
lbl := c.key.(cacheKeyPostings)
lblHash := blake2b.Sum256([]byte(lbl.Name + ":" + lbl.Value))
return "P:" + c.block.String() + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:])
return "P:" + c.block + ":" + base64.RawURLEncoding.EncodeToString(lblHash[0:])
case cacheKeySeries:
return "S:" + c.block.String() + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10)
return "S:" + c.block + ":" + strconv.FormatUint(uint64(c.key.(cacheKeySeries)), 10)
default:
return ""
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/store/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ func TestCacheKey_string(t *testing.T) {
t.Parallel()

uid := ulid.MustNew(1, nil)
ulidString := uid.String()

tests := map[string]struct {
key cacheKey
expected string
}{
"should stringify postings cache key": {
key: cacheKey{uid, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"})},
key: cacheKey{ulidString, cacheKeyPostings(labels.Label{Name: "foo", Value: "bar"})},
expected: func() string {
hash := blake2b.Sum256([]byte("foo:bar"))
encodedHash := base64.RawURLEncoding.EncodeToString(hash[0:])
Expand All @@ -41,7 +42,7 @@ func TestCacheKey_string(t *testing.T) {
}(),
},
"should stringify series cache key": {
key: cacheKey{uid, cacheKeySeries(12345)},
key: cacheKey{ulidString, cacheKeySeries(12345)},
expected: fmt.Sprintf("S:%s:12345", uid.String()),
},
}
Expand All @@ -58,6 +59,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) {
t.Parallel()

uid := ulid.MustNew(1, nil)
ulidString := uid.String()

tests := map[string]struct {
keys []cacheKey
Expand All @@ -66,14 +68,14 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) {
"should guarantee reasonably short key length for postings": {
expectedLen: 72,
keys: []cacheKey{
{uid, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})},
{uid, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})},
{ulidString, cacheKeyPostings(labels.Label{Name: "a", Value: "b"})},
{ulidString, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})},
},
},
"should guarantee reasonably short key length for series": {
expectedLen: 49,
keys: []cacheKey{
{uid, cacheKeySeries(math.MaxUint64)},
{ulidString, cacheKeySeries(math.MaxUint64)},
},
},
}
Expand All @@ -89,7 +91,7 @@ func TestCacheKey_string_ShouldGuaranteeReasonablyShortKeyLength(t *testing.T) {

func BenchmarkCacheKey_string_Postings(b *testing.B) {
uid := ulid.MustNew(1, nil)
key := cacheKey{uid, cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}
key := cacheKey{uid.String(), cacheKeyPostings(labels.Label{Name: strings.Repeat("a", 100), Value: strings.Repeat("a", 1000)})}

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -99,7 +101,7 @@ func BenchmarkCacheKey_string_Postings(b *testing.B) {

func BenchmarkCacheKey_string_Series(b *testing.B) {
uid := ulid.MustNew(1, nil)
key := cacheKey{uid, cacheKeySeries(math.MaxUint64)}
key := cacheKey{uid.String(), cacheKeySeries(math.MaxUint64)}

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
10 changes: 6 additions & 4 deletions pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,16 +290,17 @@ func copyToKey(l labels.Label) cacheKeyPostings {
// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
c.set(cacheTypePostings, cacheKey{block: blockID, key: copyToKey(l)}, v)
c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v)
}

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.ULID, keys []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
hits = map[labels.Label][]byte{}

blockIDKey := blockID.String()
for _, key := range keys {
if b, ok := c.get(cacheTypePostings, cacheKey{blockID, cacheKeyPostings(key)}); ok {
if b, ok := c.get(cacheTypePostings, cacheKey{blockIDKey, cacheKeyPostings(key)}); ok {
hits[key] = b
continue
}
Expand All @@ -313,16 +314,17 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid.
// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
c.set(cacheTypeSeries, cacheKey{blockID, cacheKeySeries(id)}, v)
c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id)}, v)
}

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
func (c *InMemoryIndexCache) FetchMultiSeries(_ context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
hits = map[storage.SeriesRef][]byte{}

blockIDKey := blockID.String()
for _, id := range ids {
if b, ok := c.get(cacheTypeSeries, cacheKey{blockID, cacheKeySeries(id)}); ok {
if b, ok := c.get(cacheTypeSeries, cacheKey{blockIDKey, cacheKeySeries(id)}); ok {
hits[id] = b
continue
}
Expand Down
46 changes: 12 additions & 34 deletions pkg/store/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) {
key := cacheKey{blockID, cacheKeyPostings(l)}.string()
key := cacheKey{blockID.String(), cacheKeyPostings(l)}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err)
Expand All @@ -75,16 +75,12 @@ func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []
// and returns a map containing cache hits, along with a list of missing keys.
// In case of error, it logs and return an empty cache hits map.
func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, lbls []labels.Label) (hits map[labels.Label][]byte, misses []labels.Label) {
// Build the cache keys, while keeping a map between input label and the cache key
// so that we can easily reverse it back after the GetMulti().
keys := make([]string, 0, len(lbls))
keysMapping := map[labels.Label]string{}

blockIDKey := blockID.String()
for _, lbl := range lbls {
key := cacheKey{blockID, cacheKeyPostings(lbl)}.string()

key := cacheKey{blockIDKey, cacheKeyPostings(lbl)}.string()
keys = append(keys, key)
keysMapping[lbl] = key
}

// Fetch the keys from memcached in a single request.
Expand All @@ -96,18 +92,11 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.

// Construct the resulting hits map and list of missing keys. We iterate on the input
// list of labels to be able to easily create the list of ones in a single iteration.
hits = map[labels.Label][]byte{}

for _, lbl := range lbls {
key, ok := keysMapping[lbl]
if !ok {
level.Error(c.logger).Log("msg", "keys mapping inconsistency found in memcached index cache client", "type", "postings", "label", lbl.Name+":"+lbl.Value)
continue
}

hits = make(map[labels.Label][]byte, len(results))
for i, lbl := range lbls {
// Check if the key has been found in memcached. If not, we add it to the list
// of missing keys.
value, ok := results[key]
value, ok := results[keys[i]]
if !ok {
misses = append(misses, lbl)
continue
Expand All @@ -124,7 +113,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.
// The function enqueues the request and returns immediately: the entry will be
// asynchronously stored in the cache.
func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) {
key := cacheKey{blockID, cacheKeySeries(id)}.string()
key := cacheKey{blockID.String(), cacheKeySeries(id)}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err)
Expand All @@ -135,16 +124,12 @@ func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef,
// and returns a map containing cache hits, along with a list of missing IDs.
// In case of error, it logs and return an empty cache hits map.
func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
// Build the cache keys, while keeping a map between input id and the cache key
// so that we can easily reverse it back after the GetMulti().
keys := make([]string, 0, len(ids))
keysMapping := map[storage.SeriesRef]string{}

blockIDKey := blockID.String()
for _, id := range ids {
key := cacheKey{blockID, cacheKeySeries(id)}.string()

key := cacheKey{blockIDKey, cacheKeySeries(id)}.string()
keys = append(keys, key)
keysMapping[id] = key
}

// Fetch the keys from memcached in a single request.
Expand All @@ -156,18 +141,11 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL

// Construct the resulting hits map and list of missing keys. We iterate on the input
// list of ids to be able to easily create the list of ones in a single iteration.
hits = map[storage.SeriesRef][]byte{}

for _, id := range ids {
key, ok := keysMapping[id]
if !ok {
level.Error(c.logger).Log("msg", "keys mapping inconsistency found in memcached index cache client", "type", "series", "id", id)
continue
}

hits = make(map[storage.SeriesRef][]byte, len(results))
for i, id := range ids {
// Check if the key has been found in memcached. If not, we add it to the list
// of missing keys.
value, ok := results[key]
value, ok := results[keys[i]]
if !ok {
misses = append(misses, id)
continue
Expand Down

0 comments on commit bacfc5a

Please sign in to comment.