Skip to content

Commit

Permalink
debug log number of index entries cached and retrieved from the cache…
Browse files Browse the repository at this point in the history
… per index query (#5642)
  • Loading branch information
sandeepsukhani authored Mar 16, 2022
1 parent 86b4869 commit 8c06c54
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions pkg/storage/chunk/storage/caching_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/cache"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/util"
"github.com/grafana/loki/pkg/tenant"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -274,13 +275,17 @@ func isChunksQuery(q chunk.IndexQuery) bool {
}

func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batches []ReadBatch) error {
logger := util_log.WithContext(ctx, s.logger)
cachePuts.Add(float64(len(keys)))

// We're doing the hashing to handle unicode and key len properly.
// Memcache fails for unicode keys and keys longer than 250 Bytes.
hashed := make([]string, 0, len(keys))
bufs := make([][]byte, 0, len(batches))
for i := range keys {
if len(batches[i].Entries) != 0 {
level.Debug(logger).Log("msg", "caching index entries", "key", keys[i], "count", len(batches[i].Entries))
}
hashed = append(hashed, cache.HashKey(keys[i]))
out, err := proto.Marshal(&batches[i])
if err != nil {
Expand All @@ -295,8 +300,9 @@ func (s *cachingIndexClient) cacheStore(ctx context.Context, keys []string, batc
}

func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (batches []ReadBatch, missed []string) {
log := spanlogger.FromContext(ctx)
level.Debug(log).Log("requested", len(keys))
spanLogger := spanlogger.FromContext(ctx)
logger := util_log.WithContext(ctx, s.logger)
level.Debug(spanLogger).Log("requested", len(keys))

cacheGets.Add(float64(len(keys)))

Expand Down Expand Up @@ -326,22 +332,27 @@ func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (bat
var readBatch ReadBatch

if err := proto.Unmarshal(bufs[j], &readBatch); err != nil {
level.Warn(log).Log("msg", "error unmarshalling index entry from cache", "err", err)
level.Warn(spanLogger).Log("msg", "error unmarshalling index entry from cache", "err", err)
cacheCorruptErrs.Inc()
continue
}

// Make sure the hash(key) is not a collision in the cache by looking at the
// key in the value.
if key != readBatch.Key {
level.Debug(log).Log("msg", "dropping index cache entry due to key collision", "key", key, "readBatch.Key", readBatch.Key, "expiry")
level.Debug(spanLogger).Log("msg", "dropping index cache entry due to key collision", "key", key, "readBatch.Key", readBatch.Key, "expiry")
continue
}

if readBatch.Expiry != 0 && time.Now().After(time.Unix(0, readBatch.Expiry)) {
continue
}

if len(readBatch.Entries) != 0 {
// not using spanLogger to avoid over-inflating traces since the query count can go much higher
level.Debug(logger).Log("msg", "found index cache entries", "key", key, "count", len(readBatch.Entries))
}

cacheHits.Inc()
batches = append(batches, readBatch)
}
Expand All @@ -359,6 +370,6 @@ func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (bat
missed = append(missed, miss)
}

level.Debug(log).Log("hits", len(batches), "misses", len(misses))
level.Debug(spanLogger).Log("hits", len(batches), "misses", len(misses))
return batches, missed
}

0 comments on commit 8c06c54

Please sign in to comment.