Skip to content

Commit

Permalink
Allow configurable request logger in Store Gateway (#7367)
Browse files Browse the repository at this point in the history
* allow configurable request logger for Store Gateway

Signed-off-by: Ben Ye <benye@amazon.com>

* lint

Signed-off-by: Ben Ye <benye@amazon.com>

* lint

Signed-off-by: Ben Ye <benye@amazon.com>

* fix tests

Signed-off-by: Ben Ye <benye@amazon.com>

* fix test

Signed-off-by: Ben Ye <benye@amazon.com>

* address comments

Signed-off-by: Ben Ye <benye@amazon.com>

* fix tests

Signed-off-by: Ben Ye <benye@amazon.com>

* changelog

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored May 17, 2024
1 parent e752424 commit 9e6cbd9
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added

- [#7317](https://github.com/thanos-io/thanos/pull/7317) Tracing: allow specifying resource attributes for the OTLP configuration.
- [#7367](https://github.com/thanos-io/thanos/pull/7367) Store Gateway: log request ID in request logs.

### Changed

Expand Down
8 changes: 8 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -394,6 +395,13 @@ func runStore(

options := []store.BucketStoreOption{
store.WithLogger(logger),
store.WithRequestLoggerFunc(func(ctx context.Context, logger log.Logger) log.Logger {
reqID, ok := middleware.RequestIDFromContext(ctx)
if ok {
return log.With(logger, "request-id", reqID)
}
return logger
}),
store.WithRegistry(reg),
store.WithIndexCache(indexCache),
store.WithQueryGate(queriesGate),
Expand Down
88 changes: 56 additions & 32 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ type BucketStore struct {
blockEstimatedMaxChunkFunc BlockEstimator

indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc

requestLoggerFunc RequestLoggerFunc
}

func (s *BucketStore) validate() error {
Expand Down Expand Up @@ -449,6 +451,20 @@ func WithLogger(logger log.Logger) BucketStoreOption {
}
}

type RequestLoggerFunc func(ctx context.Context, log log.Logger) log.Logger

func NoopRequestLoggerFunc(_ context.Context, logger log.Logger) log.Logger {
return logger
}

// WithRequestLoggerFunc sets the BucketStore to use the passed RequestLoggerFunc
// to initialize logger during query time.
func WithRequestLoggerFunc(loggerFunc RequestLoggerFunc) BucketStoreOption {
return func(s *BucketStore) {
s.requestLoggerFunc = loggerFunc
}
}

// WithRegistry sets a registry that BucketStore uses to register metrics with.
func WithRegistry(reg prometheus.Registerer) BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -583,6 +599,7 @@ func NewBucketStore(
seriesBatchSize: SeriesBatchSize,
sortingStrategy: sortingStrategyStore,
indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader,
requestLoggerFunc: NoopRequestLoggerFunc,
}

for _, option := range options {
Expand Down Expand Up @@ -779,7 +796,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er

b, err := newBucketBlock(
ctx,
log.With(s.logger, "block", meta.ULID),
s.metrics,
meta,
s.bkt,
Expand Down Expand Up @@ -1037,7 +1053,7 @@ func newBlockSeriesClient(
) *blockSeriesClient {
var chunkr *bucketChunkReader
if !req.SkipChunks {
chunkr = b.chunkReader()
chunkr = b.chunkReader(logger)
}

extLset := b.extLset
Expand All @@ -1053,7 +1069,7 @@ func newBlockSeriesClient(

mint: req.MinTime,
maxt: req.MaxTime,
indexr: b.indexReader(),
indexr: b.indexReader(logger),
chunkr: chunkr,
seriesLimiter: seriesLimiter,
chunksLimiter: chunksLimiter,
Expand Down Expand Up @@ -1469,6 +1485,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))

queryStatsEnabled = false

logger = s.requestLoggerFunc(ctx, s.logger)
)

if req.Hints != nil {
Expand Down Expand Up @@ -1505,7 +1523,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers)

if s.debugLogging {
debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks)
debugFoundBlockSetOverview(logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks)
}

for _, b := range blocks {
Expand All @@ -1521,7 +1539,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store

blockClient := newBlockSeriesClient(
srv.Context(),
s.logger,
log.With(logger, "block", blk.meta.ULID),
blk,
req,
seriesLimiter,
Expand Down Expand Up @@ -1633,7 +1651,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
s.metrics.cachedPostingsCompressedSizeBytes.WithLabelValues(tenant).Add(float64(stats.CachedPostingsCompressedSizeSum))
s.metrics.postingsSizeBytes.WithLabelValues(tenant).Observe(float64(int(stats.PostingsFetchedSizeSum) + int(stats.PostingsTouchedSizeSum)))

level.Debug(s.logger).Log("msg", "stats query processed",
level.Debug(logger).Log("msg", "stats query processed",
"request", req,
"tenant", tenant,
"stats", fmt.Sprintf("%+v", stats), "err", err)
Expand Down Expand Up @@ -1764,6 +1782,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant))
var logger = s.requestLoggerFunc(ctx, s.logger)

for _, b := range s.blocks {
b := b
Expand All @@ -1785,7 +1804,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
blockLogger := log.With(logger, "block", b.meta.ULID)
indexr := b.indexReader(blockLogger)

g.Go(func() error {
span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_names", tracing.Tags{
Expand All @@ -1795,7 +1815,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
"block.resolution": b.meta.Thanos.Downsample.Resolution,
})
defer span.Finish()
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names")
defer runutil.CloseWithLogOnErr(blockLogger, indexr, "label names")

var result []string
if len(reqSeriesMatchersNoExtLabels) == 0 {
Expand Down Expand Up @@ -1826,7 +1846,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
}
blockClient := newBlockSeriesClient(
newCtx,
s.logger,
blockLogger,
b,
seriesReq,
seriesLimiter,
Expand Down Expand Up @@ -1973,6 +1993,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant))
var logger = s.requestLoggerFunc(ctx, s.logger)

for _, b := range s.blocks {
b := b
Expand Down Expand Up @@ -2004,7 +2025,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
blockLogger := log.With(logger, "block", b.meta.ULID)
indexr := b.indexReader(blockLogger)

g.Go(func() error {
span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_values", tracing.Tags{
"block.id": b.meta.ULID,
Expand All @@ -2013,7 +2036,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
"block.resolution": b.meta.Thanos.Downsample.Resolution,
})
defer span.Finish()
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values")
defer runutil.CloseWithLogOnErr(blockLogger, indexr, "label values")

var result []string
if len(reqSeriesMatchersNoExtLabels) == 0 {
Expand All @@ -2037,7 +2060,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
blockClient := newBlockSeriesClient(
newCtx,
s.logger,
blockLogger,
b,
seriesReq,
seriesLimiter,
Expand Down Expand Up @@ -2267,7 +2290,6 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M
// bucketBlock represents a block that is located in a bucket. It holds intermediate
// state for the block on local disk.
type bucketBlock struct {
logger log.Logger
metrics *bucketStoreMetrics
bkt objstore.BucketReader
meta *metadata.Meta
Expand All @@ -2294,7 +2316,6 @@ type bucketBlock struct {

func newBucketBlock(
ctx context.Context,
logger log.Logger,
metrics *bucketStoreMetrics,
meta *metadata.Meta,
bkt objstore.BucketReader,
Expand All @@ -2319,7 +2340,6 @@ func newBucketBlock(
extLset := labels.FromMap(meta.Thanos.Labels)
relabelLabels := labels.NewBuilder(extLset).Set(block.BlockIDLabel, meta.ULID.String()).Labels()
b = &bucketBlock{
logger: logger,
metrics: metrics,
bkt: bkt,
indexCache: indexCache,
Expand Down Expand Up @@ -2358,12 +2378,12 @@ func (b *bucketBlock) indexFilename() string {
return path.Join(b.meta.ULID.String(), block.IndexFilename)
}

func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]byte, error) {
func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64, logger log.Logger) ([]byte, error) {
r, err := b.bkt.GetRange(ctx, b.indexFilename(), off, length)
if err != nil {
return nil, errors.Wrap(err, "get range reader")
}
defer runutil.CloseWithLogOnErr(b.logger, r, "readIndexRange close range reader")
defer runutil.CloseWithLogOnErr(logger, r, "readIndexRange close range reader")

// Preallocate the buffer with the exact size so we don't waste allocations
// while progressively growing an initial small buffer. The buffer capacity
Expand All @@ -2376,7 +2396,7 @@ func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]
return buf.Bytes(), nil
}

func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges) (*[]byte, error) {
func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges, logger log.Logger) (*[]byte, error) {
if seq < 0 || seq >= len(b.chunkObjs) {
return nil, errors.Errorf("unknown segment file for index %d", seq)
}
Expand All @@ -2386,7 +2406,7 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i
if err != nil {
return nil, errors.Wrap(err, "get range reader")
}
defer runutil.CloseWithLogOnErr(b.logger, reader, "readChunkRange close range reader")
defer runutil.CloseWithLogOnErr(logger, reader, "readChunkRange close range reader")

// Get a buffer from the pool.
chunkBuffer, err := b.chunkPool.Get(chunkRanges.size())
Expand All @@ -2410,14 +2430,14 @@ func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length
return b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length)
}

func (b *bucketBlock) indexReader() *bucketIndexReader {
func (b *bucketBlock) indexReader(logger log.Logger) *bucketIndexReader {
b.pendingReaders.Add(1)
return newBucketIndexReader(b)
return newBucketIndexReader(b, logger)
}

func (b *bucketBlock) chunkReader() *bucketChunkReader {
func (b *bucketBlock) chunkReader(logger log.Logger) *bucketChunkReader {
b.pendingReaders.Add(1)
return newBucketChunkReader(b)
return newBucketChunkReader(b, logger)
}

// matchRelabelLabels verifies whether the block matches the given matchers.
Expand Down Expand Up @@ -2454,16 +2474,18 @@ type bucketIndexReader struct {
loadedSeries map[storage.SeriesRef][]byte

indexVersion int
logger log.Logger
}

func newBucketIndexReader(block *bucketBlock) *bucketIndexReader {
func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader {
r := &bucketIndexReader{
block: block,
dec: &index.Decoder{
LookupSymbol: block.indexHeaderReader.LookupSymbol,
},
stats: &queryStats{},
loadedSeries: map[storage.SeriesRef][]byte{},
logger: logger,
}
return r
}
Expand Down Expand Up @@ -2863,13 +2885,13 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
}()
// If failed to decode or expand cached postings, return and expand postings again.
if err != nil {
level.Error(r.block.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
return false, nil, nil
}

ps, err := ExpandPostingsWithContext(ctx, p)
if err != nil {
level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
return false, nil, nil
}

Expand Down Expand Up @@ -3009,7 +3031,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
if err != nil {
return errors.Wrap(err, "read postings range")
}
defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader")
defer runutil.CloseWithLogOnErr(r.logger, partReader, "readIndexRange close range reader")
brdr.Reset(partReader)

rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length)
Expand Down Expand Up @@ -3194,7 +3216,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
stats.DataDownloadedSizeSum += units.Base2Bytes(end - start)
}

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), r.logger)
if err != nil {
return errors.Wrap(err, "read series range")
}
Expand All @@ -3218,7 +3240,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series

// Inefficient, but should be rare.
r.block.metrics.seriesRefetches.WithLabelValues(tenant).Inc()
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize)
level.Warn(r.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize)

// Fetch plus to get the size of next one if exists.
return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter, tenant)
Expand Down Expand Up @@ -3408,17 +3430,19 @@ type bucketChunkReader struct {
chunkBytesMtx sync.Mutex
stats *queryStats
chunkBytes []*[]byte // Byte slice to return to the chunk pool on close.
logger log.Logger

loadingChunksMtx sync.Mutex
loadingChunks bool
finishLoadingChks chan struct{}
}

func newBucketChunkReader(block *bucketBlock) *bucketChunkReader {
func newBucketChunkReader(block *bucketBlock, logger log.Logger) *bucketChunkReader {
return &bucketChunkReader{
block: block,
stats: &queryStats{},
toLoad: make([][]loadIdx, len(block.chunkObjs)),
logger: logger,
}
}

Expand Down Expand Up @@ -3525,7 +3549,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
if err != nil {
return errors.Wrap(err, "get range reader")
}
defer runutil.CloseWithLogOnErr(r.block.logger, reader, "readChunkRange close range reader")
defer runutil.CloseWithLogOnErr(r.logger, reader, "readChunkRange close range reader")
bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize)

stats.chunksFetchCount++
Expand Down Expand Up @@ -3605,7 +3629,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
}
stats.DataDownloadedSizeSum += units.Base2Bytes(chunkLen)

nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}})
nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}, r.logger)
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen)
}
Expand Down
Loading

0 comments on commit 9e6cbd9

Please sign in to comment.