Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed May 17, 2024
1 parent 168c081 commit 153296a
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 47 deletions.
62 changes: 32 additions & 30 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ func newBlockSeriesClient(
) *blockSeriesClient {
var chunkr *bucketChunkReader
if !req.SkipChunks {
chunkr = b.chunkReader()
chunkr = b.chunkReader(logger)
}

extLset := b.extLset
Expand All @@ -1069,7 +1069,7 @@ func newBlockSeriesClient(

mint: req.MinTime,
maxt: req.MaxTime,
indexr: b.indexReader(),
indexr: b.indexReader(logger),
chunkr: chunkr,
seriesLimiter: seriesLimiter,
chunksLimiter: chunksLimiter,
Expand Down Expand Up @@ -1132,7 +1132,7 @@ func (b *blockSeriesClient) ExpandPostings(
matchers sortedMatchers,
seriesLimiter SeriesLimiter,
) error {
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant, b.logger)
ps, err := b.indexr.ExpandedPostings(b.ctx, matchers, b.bytesLimiter, b.lazyExpandedPostingEnabled, b.lazyExpandedPostingSizeBytes, b.tenant)
if err != nil {
return errors.Wrap(err, "expanded matching posting")
}
Expand Down Expand Up @@ -1314,7 +1314,7 @@ OUTER:
}

if !b.skipChunks {
if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant, b.logger); err != nil {
if err := b.chunkr.load(b.ctx, b.entries, b.loadAggregates, b.calculateChunkHash, b.bytesLimiter, b.tenant); err != nil {
return errors.Wrap(err, "load chunks")
}
}
Expand Down Expand Up @@ -1804,9 +1804,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()

blockLogger := log.With(logger, "block", b.meta.ULID)
indexr := b.indexReader(blockLogger)

g.Go(func() error {
span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_names", tracing.Tags{
Expand Down Expand Up @@ -2026,9 +2025,8 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()

blockLogger := log.With(logger, "block", b.meta.ULID)
indexr := b.indexReader(blockLogger)

g.Go(func() error {
span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_values", tracing.Tags{
Expand Down Expand Up @@ -2432,14 +2430,14 @@ func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length
return b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length)
}

func (b *bucketBlock) indexReader() *bucketIndexReader {
func (b *bucketBlock) indexReader(logger log.Logger) *bucketIndexReader {
b.pendingReaders.Add(1)
return newBucketIndexReader(b)
return newBucketIndexReader(b, logger)
}

func (b *bucketBlock) chunkReader() *bucketChunkReader {
func (b *bucketBlock) chunkReader(logger log.Logger) *bucketChunkReader {
b.pendingReaders.Add(1)
return newBucketChunkReader(b)
return newBucketChunkReader(b, logger)
}

// matchRelabelLabels verifies whether the block matches the given matchers.
Expand Down Expand Up @@ -2476,16 +2474,18 @@ type bucketIndexReader struct {
loadedSeries map[storage.SeriesRef][]byte

indexVersion int
logger log.Logger
}

func newBucketIndexReader(block *bucketBlock) *bucketIndexReader {
func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader {
r := &bucketIndexReader{
block: block,
dec: &index.Decoder{
LookupSymbol: block.indexHeaderReader.LookupSymbol,
},
stats: &queryStats{},
loadedSeries: map[storage.SeriesRef][]byte{},
logger: logger,
}
return r
}
Expand Down Expand Up @@ -2516,14 +2516,14 @@ func (r *bucketIndexReader) reset(size int) {
// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string, logger log.Logger) (*lazyExpandedPostings, error) {
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatchers, bytesLimiter BytesLimiter, lazyExpandedPostingEnabled bool, lazyExpandedPostingSizeBytes prometheus.Counter, tenant string) (*lazyExpandedPostings, error) {
// Shortcut the case of `len(postingGroups) == 0`. It will only happen when no
// matchers specified, and we don't need to fetch expanded postings from cache.
if len(ms) == 0 {
return nil, nil
}

hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant, logger)
hit, postings, err := r.fetchExpandedPostingsFromCache(ctx, ms, bytesLimiter, tenant)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2568,7 +2568,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch
postingGroups = append(postingGroups, newPostingGroup(true, name, []string{value}, nil))
}

ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant, logger)
ps, err := fetchLazyExpandedPostings(ctx, postingGroups, r, bytesLimiter, addAllPostings, lazyExpandedPostingEnabled, lazyExpandedPostingSizeBytes, tenant)
if err != nil {
return nil, errors.Wrap(err, "fetch and expand postings")
}
Expand Down Expand Up @@ -2866,7 +2866,7 @@ type postingPtr struct {
ptr index.Range
}

func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string, logger log.Logger) (bool, []storage.SeriesRef, error) {
func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter, tenant string) (bool, []storage.SeriesRef, error) {
dataFromCache, hit := r.block.indexCache.FetchExpandedPostings(ctx, r.block.meta.ULID, ms, tenant)
if !hit {
return false, nil, nil
Expand All @@ -2885,13 +2885,13 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
}()
// If failed to decode or expand cached postings, return and expand postings again.
if err != nil {
level.Error(logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
return false, nil, nil
}

ps, err := ExpandPostingsWithContext(ctx, p)
if err != nil {
level.Error(logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
return false, nil, nil
}

Expand Down Expand Up @@ -2932,7 +2932,7 @@ var bufioReaderPool = sync.Pool{
// fetchPostings fill postings requested by posting groups.
// It returns one posting for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string, logger log.Logger) ([]index.Postings, []func(), error) {
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter, tenant string) ([]index.Postings, []func(), error) {
var closeFns []func()

timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration.WithLabelValues(tenant))
Expand Down Expand Up @@ -3031,7 +3031,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
if err != nil {
return errors.Wrap(err, "read postings range")
}
defer runutil.CloseWithLogOnErr(logger, partReader, "readIndexRange close range reader")
defer runutil.CloseWithLogOnErr(r.logger, partReader, "readIndexRange close range reader")
brdr.Reset(partReader)

rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length)
Expand Down Expand Up @@ -3168,7 +3168,7 @@ func (it *bigEndianPostings) length() int {
return len(it.list) / 4
}

func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error {
func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter, tenant string) error {
timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration.WithLabelValues(tenant))
defer func() {
d := timer.ObserveDuration()
Expand Down Expand Up @@ -3196,13 +3196,13 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser
i, j := p.ElemRng[0], p.ElemRng[1]

g.Go(func() error {
return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter, tenant, logger)
return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter, tenant)
})
}
return g.Wait()
}

func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error {
func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter, tenant string) error {
begin := time.Now()
stats := new(queryStats)
defer func() {
Expand All @@ -3216,7 +3216,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
stats.DataDownloadedSizeSum += units.Base2Bytes(end - start)
}

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), logger)
b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), r.logger)
if err != nil {
return errors.Wrap(err, "read series range")
}
Expand Down Expand Up @@ -3430,17 +3430,19 @@ type bucketChunkReader struct {
chunkBytesMtx sync.Mutex
stats *queryStats
chunkBytes []*[]byte // Byte slice to return to the chunk pool on close.
logger log.Logger

loadingChunksMtx sync.Mutex
loadingChunks bool
finishLoadingChks chan struct{}
}

func newBucketChunkReader(block *bucketBlock) *bucketChunkReader {
func newBucketChunkReader(block *bucketBlock, logger log.Logger) *bucketChunkReader {
return &bucketChunkReader{
block: block,
stats: &queryStats{},
toLoad: make([][]loadIdx, len(block.chunkObjs)),
logger: logger,
}
}

Expand Down Expand Up @@ -3487,7 +3489,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int)
}

// load loads all added chunks and saves resulting aggrs to refs.
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error {
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string) error {
r.loadingChunksMtx.Lock()
r.loadingChunks = true
r.loadingChunksMtx.Unlock()
Expand Down Expand Up @@ -3525,7 +3527,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
p := p
indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]]
g.Go(func() error {
return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter, tenant, logger)
return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter, tenant)
})
}
}
Expand All @@ -3534,7 +3536,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [

// loadChunks will read range [start, end] from the segment file with sequence number seq.
// This data range covers chunks starting at supplied offsets.
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string, logger log.Logger) error {
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter, tenant string) error {
fetchBegin := time.Now()
stats := new(queryStats)
defer func() {
Expand All @@ -3547,7 +3549,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
if err != nil {
return errors.Wrap(err, "get range reader")
}
defer runutil.CloseWithLogOnErr(logger, reader, "readChunkRange close range reader")
defer runutil.CloseWithLogOnErr(r.logger, reader, "readChunkRange close range reader")
bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize)

stats.chunksFetchCount++
Expand Down
14 changes: 7 additions & 7 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,11 +1318,11 @@ func benchmarkExpandedPostings(
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
}

indexr := newBucketIndexReader(b)
indexr := newBucketIndexReader(b, logger)

t.ResetTimer()
for i := 0; i < t.N(); i++ {
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant, logger)
p, err := indexr.ExpandedPostings(context.Background(), newSortedMatchers(c.matchers), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, c.expectedLen, len(p.postings))
}
Expand Down Expand Up @@ -1351,13 +1351,13 @@ func TestExpandedPostingsEmptyPostings(t *testing.T) {
}

logger := log.NewNopLogger()
indexr := newBucketIndexReader(b)
indexr := newBucketIndexReader(b, logger)
matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo")
// Match nothing.
matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "i", "500.*")
ctx := context.Background()
dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant, logger)
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2}), NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant)
testutil.Ok(t, err)
testutil.Equals(t, ps, (*lazyExpandedPostings)(nil))
// Make sure even if a matcher doesn't match any postings, we still cache empty expanded postings.
Expand Down Expand Up @@ -1386,14 +1386,14 @@ func TestLazyExpandedPostingsEmptyPostings(t *testing.T) {
}

logger := log.NewNopLogger()
indexr := newBucketIndexReader(b)
indexr := newBucketIndexReader(b, logger)
// matcher1 and matcher2 will match nothing after intersection.
matcher1 := labels.MustNewMatcher(labels.MatchEqual, "j", "foo")
matcher2 := labels.MustNewMatcher(labels.MatchRegexp, "n", "1_.*")
matcher3 := labels.MustNewMatcher(labels.MatchRegexp, "i", ".+")
ctx := context.Background()
dummyCounter := promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{Name: "test"})
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, dummyCounter, tenancy.DefaultTenant, logger)
ps, err := indexr.ExpandedPostings(ctx, newSortedMatchers([]*labels.Matcher{matcher1, matcher2, matcher3}), NewBytesLimiterFactory(0)(nil), true, dummyCounter, tenancy.DefaultTenant)
testutil.Ok(t, err)
// We expect emptyLazyPostings rather than lazy postings with 0 length but with matchers.
testutil.Equals(t, ps, emptyLazyPostings)
Expand Down Expand Up @@ -3542,7 +3542,7 @@ func TestExpandedPostingsRace(t *testing.T) {
i := i
bb := bb
go func(i int, bb *bucketBlock) {
refs, err := bb.indexReader().ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant, logger)
refs, err := bb.indexReader(logger).ExpandedPostings(context.Background(), m, NewBytesLimiterFactory(0)(nil), false, dummyCounter, tenancy.DefaultTenant)
testutil.Ok(t, err)
defer wg.Done()

Expand Down
13 changes: 5 additions & 8 deletions pkg/store/lazy_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -40,7 +39,7 @@ func (p *lazyExpandedPostings) lazyExpanded() bool {
return p != nil && len(p.matchers) > 0
}

func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter, logger log.Logger) ([]*postingGroup, bool, error) {
func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups []*postingGroup, seriesMaxSize int64, seriesMatchRatio float64, lazyExpandedPostingSizeBytes prometheus.Counter) ([]*postingGroup, bool, error) {
if len(postingGroups) <= 1 {
return postingGroups, false, nil
}
Expand All @@ -61,7 +60,7 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups
continue
}
if rng.End <= rng.Start {
level.Error(logger).Log("msg", "invalid index range, fallback to non lazy posting optimization")
level.Error(r.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization")
return postingGroups, false, nil
}
// Each range starts from the #entries field which is 4 bytes.
Expand Down Expand Up @@ -193,7 +192,6 @@ func fetchLazyExpandedPostings(
lazyExpandedPostingEnabled bool,
lazyExpandedPostingSizeBytes prometheus.Counter,
tenant string,
logger log.Logger,
) (*lazyExpandedPostings, error) {
var (
err error
Expand All @@ -215,7 +213,6 @@ func fetchLazyExpandedPostings(
int64(r.block.estimatedMaxSeriesSize),
0.5, // TODO(yeya24): Expose this as a flag.
lazyExpandedPostingSizeBytes,
logger,
)
if err != nil {
return nil, err
Expand All @@ -225,7 +222,7 @@ func fetchLazyExpandedPostings(
}
}

ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant, logger)
ps, matchers, err := fetchAndExpandPostingGroups(ctx, r, postingGroups, bytesLimiter, tenant)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -270,9 +267,9 @@ func keysToFetchFromPostingGroups(postingGroups []*postingGroup) ([]labels.Label
return keys, lazyMatchers
}

func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string, logger log.Logger) ([]storage.SeriesRef, []*labels.Matcher, error) {
func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, postingGroups []*postingGroup, bytesLimiter BytesLimiter, tenant string) ([]storage.SeriesRef, []*labels.Matcher, error) {
keys, lazyMatchers := keysToFetchFromPostingGroups(postingGroups)
fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter, tenant, logger)
fetchedPostings, closeFns, err := r.fetchPostings(ctx, keys, bytesLimiter, tenant)
defer func() {
for _, closeFn := range closeFns {
closeFn()
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/lazy_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,9 @@ func TestOptimizePostingsFetchByDownloadedBytes(t *testing.T) {
registry := prometheus.NewRegistry()
block, err := newBucketBlock(ctx, newBucketStoreMetrics(registry), meta, bkt, path.Join(dir, blockID.String()), nil, nil, headerReader, nil, nil, nil)
testutil.Ok(t, err)
ir := newBucketIndexReader(block)
ir := newBucketIndexReader(block, logger)
dummyCounter := promauto.With(registry).NewCounter(prometheus.CounterOpts{Name: "test"})
pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, dummyCounter, logger)
pgs, emptyPosting, err := optimizePostingsFetchByDownloadedBytes(ir, tc.postingGroups, tc.seriesMaxSize, tc.seriesMatchRatio, dummyCounter)
if err != nil {
testutil.Equals(t, tc.expectedError, err.Error())
return
Expand Down

0 comments on commit 153296a

Please sign in to comment.