Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Jun 6, 2023
1 parent 3a69571 commit 1170c9c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 15 deletions.
44 changes: 31 additions & 13 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2185,6 +2185,20 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
if err != nil {
return nil, errors.Wrap(err, "expand")
}

if len(ps) > 0 {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return nil, errors.Wrap(err, "get index version")
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
}
}
}
return ps, nil
}
// If failed to decode cached postings, try to expand postings again.
Expand Down Expand Up @@ -2284,25 +2298,29 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
return nil, errors.Wrap(err, "expand")
}

// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return nil, errors.Wrap(err, "get index version")
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
}
}
// Encode postings to cache.
// Encode postings to cache. We compress and cache postings before adding
// 16 bytes padding in order to make compressed size smaller.
dataToCache, compressionDuration, compressionErrors, compressedSize := r.encodePostingsToCache(index.NewListPostings(ps), len(ps))
r.stats.cachedPostingsCompressions++
r.stats.cachedPostingsCompressionErrors += compressionErrors
r.stats.CachedPostingsCompressionTimeSum += compressionDuration
r.stats.CachedPostingsCompressedSizeSum += units.Base2Bytes(compressedSize)
r.stats.CachedPostingsOriginalSizeSum += units.Base2Bytes(len(ps) * 4) // Estimate the posting list size.
r.block.indexCache.StoreExpandedPostings(r.block.meta.ULID, ms, dataToCache)

if len(ps) > 0 {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return nil, errors.Wrap(err, "get index version")
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
}
}
}
return ps, nil
}

Expand Down Expand Up @@ -2452,7 +2470,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab

l, closer, err := r.decodeCachedPostings(b)
if err != nil {

return nil, closeFns, errors.Wrap(err, "decode postings")
}
output[ix] = l
closeFns = append(closeFns, closer...)
Expand Down
3 changes: 2 additions & 1 deletion pkg/store/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package storecache
import (
"context"
"encoding/base64"
"strconv"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"golang.org/x/crypto/blake2b"
"strconv"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers [

// FetchExpandedPostings fetches expanded postings.
func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ulid.ULID, matchers []*labels.Matcher) ([]byte, bool) {
if b, ok := c.get(cacheTypePostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers))}); ok {
if b, ok := c.get(cacheTypeExpandedPostings, cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(matchers))}); ok {
return b, true
}
return nil, false
Expand Down

0 comments on commit 1170c9c

Please sign in to comment.