Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Jun 9, 2023
1 parent c1349ae commit 29661d0
Showing 1 changed file with 50 additions and 35 deletions.
85 changes: 50 additions & 35 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2166,42 +2166,12 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
}
return ms[i].Type < ms[j].Type
})
dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms)
hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter)
if err != nil {
return nil, err
}
if hit {
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache))
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(dataFromCache))
p, closeFns, err := r.decodeCachedPostings(dataFromCache)
defer func() {
for _, closeFn := range closeFns {
closeFn()
}
}()
if err == nil {
ps, err := index.ExpandPostings(p)
if err != nil {
return nil, errors.Wrap(err, "expand")
}

if len(ps) > 0 {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return nil, errors.Wrap(err, "get index version")
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
}
}
}
return ps, nil
}
// If failed to decode cached postings, try to expand postings again.
return postings, nil
}
var (
postingGroups []*postingGroup
Expand Down Expand Up @@ -2437,6 +2407,51 @@ type postingPtr struct {
ptr index.Range
}

func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) (bool, []storage.SeriesRef, error) {
dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms)
if !hit {
return false, nil, nil
}
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return false, nil, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading expanded postings from index cache: %s", err)
}
r.stats.DataDownloadedSizeSum += units.Base2Bytes(len(dataFromCache))
r.stats.postingsTouched++
r.stats.PostingsTouchedSizeSum += units.Base2Bytes(len(dataFromCache))
p, closeFns, err := r.decodeCachedPostings(dataFromCache)
defer func() {
for _, closeFn := range closeFns {
closeFn()
}
}()
// If failed to decode or expand cached postings, return and expand postings again.
if err != nil {
level.Error(r.block.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String())
return false, nil, nil
}

ps, err := index.ExpandPostings(p)
if err != nil {
level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String())
return false, nil, nil
}

if len(ps) > 0 {
// As of version two all series entries are 16 byte padded. All references
// we get have to account for that to get the correct offset.
version, err := r.block.indexHeaderReader.IndexVersion()
if err != nil {
return false, nil, errors.Wrap(err, "get index version")
}
if version >= 2 {
for i, id := range ps {
ps[i] = id * 16
}
}
}
return true, ps, nil
}

// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
Expand Down

0 comments on commit 29661d0

Please sign in to comment.