Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configurable request logger in Store Gateway #7367

Merged
merged 8 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added

- [#7317](https://github.com/thanos-io/thanos/pull/7317) Tracing: allow specifying resource attributes for the OTLP configuration.
- [#7367](https://github.com/thanos-io/thanos/pull/7367) Store Gateway: log request ID in request logs.

### Changed

Expand Down
8 changes: 8 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
"github.com/thanos-io/thanos/pkg/store"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -394,6 +395,13 @@ func runStore(

options := []store.BucketStoreOption{
store.WithLogger(logger),
store.WithRequestLoggerFunc(func(ctx context.Context, logger log.Logger) log.Logger {
reqID, ok := middleware.RequestIDFromContext(ctx)
if ok {
return log.With(logger, "request-id", reqID)
}
return logger
}),
store.WithRegistry(reg),
store.WithIndexCache(indexCache),
store.WithQueryGate(queriesGate),
Expand Down
88 changes: 56 additions & 32 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ type BucketStore struct {
blockEstimatedMaxChunkFunc BlockEstimator

indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc

requestLoggerFunc RequestLoggerFunc
}

func (s *BucketStore) validate() error {
Expand Down Expand Up @@ -449,6 +451,20 @@ func WithLogger(logger log.Logger) BucketStoreOption {
}
}

type RequestLoggerFunc func(ctx context.Context, log log.Logger) log.Logger

func NoopRequestLoggerFunc(_ context.Context, logger log.Logger) log.Logger {
return logger
}

// WithRequestLoggerFunc sets the BucketStore to use the passed RequestLoggerFunc
// to initialize logger during query time.
func WithRequestLoggerFunc(loggerFunc RequestLoggerFunc) BucketStoreOption {
return func(s *BucketStore) {
s.requestLoggerFunc = loggerFunc
}
}

// WithRegistry sets a registry that BucketStore uses to register metrics with.
func WithRegistry(reg prometheus.Registerer) BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -583,6 +599,7 @@ func NewBucketStore(
seriesBatchSize: SeriesBatchSize,
sortingStrategy: sortingStrategyStore,
indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader,
requestLoggerFunc: NoopRequestLoggerFunc,
}

for _, option := range options {
Expand Down Expand Up @@ -779,7 +796,6 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er

b, err := newBucketBlock(
ctx,
log.With(s.logger, "block", meta.ULID),
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
s.metrics,
meta,
s.bkt,
Expand Down Expand Up @@ -1037,7 +1053,7 @@ func newBlockSeriesClient(
) *blockSeriesClient {
var chunkr *bucketChunkReader
if !req.SkipChunks {
chunkr = b.chunkReader()
chunkr = b.chunkReader(logger)
}

extLset := b.extLset
Expand All @@ -1053,7 +1069,7 @@ func newBlockSeriesClient(

mint: req.MinTime,
maxt: req.MaxTime,
indexr: b.indexReader(),
indexr: b.indexReader(logger),
chunkr: chunkr,
seriesLimiter: seriesLimiter,
chunksLimiter: chunksLimiter,
Expand Down Expand Up @@ -1469,6 +1485,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))

queryStatsEnabled = false

logger = s.requestLoggerFunc(ctx, s.logger)
)

if req.Hints != nil {
Expand Down Expand Up @@ -1505,7 +1523,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
blocks := bs.getFor(req.MinTime, req.MaxTime, req.MaxResolutionWindow, reqBlockMatchers)

if s.debugLogging {
debugFoundBlockSetOverview(s.logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks)
debugFoundBlockSetOverview(logger, req.MinTime, req.MaxTime, req.MaxResolutionWindow, bs.labels, blocks)
}

for _, b := range blocks {
Expand All @@ -1521,7 +1539,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store

blockClient := newBlockSeriesClient(
srv.Context(),
s.logger,
log.With(logger, "block", blk.meta.ULID),
blk,
req,
seriesLimiter,
Expand Down Expand Up @@ -1633,7 +1651,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
s.metrics.cachedPostingsCompressedSizeBytes.WithLabelValues(tenant).Add(float64(stats.CachedPostingsCompressedSizeSum))
s.metrics.postingsSizeBytes.WithLabelValues(tenant).Observe(float64(int(stats.PostingsFetchedSizeSum) + int(stats.PostingsTouchedSizeSum)))

level.Debug(s.logger).Log("msg", "stats query processed",
level.Debug(logger).Log("msg", "stats query processed",
"request", req,
"tenant", tenant,
"stats", fmt.Sprintf("%+v", stats), "err", err)
Expand Down Expand Up @@ -1764,6 +1782,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant))
var logger = s.requestLoggerFunc(ctx, s.logger)

for _, b := range s.blocks {
b := b
Expand All @@ -1785,7 +1804,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
blockLogger := log.With(logger, "block", b.meta.ULID)
indexr := b.indexReader(blockLogger)

g.Go(func() error {
span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_names", tracing.Tags{
Expand All @@ -1795,7 +1815,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
"block.resolution": b.meta.Thanos.Downsample.Resolution,
})
defer span.Finish()
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names")
defer runutil.CloseWithLogOnErr(blockLogger, indexr, "label names")

var result []string
if len(reqSeriesMatchersNoExtLabels) == 0 {
Expand Down Expand Up @@ -1826,7 +1846,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
}
blockClient := newBlockSeriesClient(
newCtx,
s.logger,
blockLogger,
b,
seriesReq,
seriesLimiter,
Expand Down Expand Up @@ -1973,6 +1993,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series", tenant))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes", tenant))
var logger = s.requestLoggerFunc(ctx, s.logger)

for _, b := range s.blocks {
b := b
Expand Down Expand Up @@ -2004,7 +2025,9 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR

resHints.AddQueriedBlock(b.meta.ULID)

indexr := b.indexReader()
blockLogger := log.With(logger, "block", b.meta.ULID)
indexr := b.indexReader(blockLogger)

g.Go(func() error {
span, newCtx := tracing.StartSpan(gctx, "bucket_store_block_label_values", tracing.Tags{
"block.id": b.meta.ULID,
Expand All @@ -2013,7 +2036,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
"block.resolution": b.meta.Thanos.Downsample.Resolution,
})
defer span.Finish()
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values")
defer runutil.CloseWithLogOnErr(blockLogger, indexr, "label values")

var result []string
if len(reqSeriesMatchersNoExtLabels) == 0 {
Expand All @@ -2037,7 +2060,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
blockClient := newBlockSeriesClient(
newCtx,
s.logger,
blockLogger,
b,
seriesReq,
seriesLimiter,
Expand Down Expand Up @@ -2267,7 +2290,6 @@ func (s *bucketBlockSet) labelMatchers(matchers ...*labels.Matcher) ([]*labels.M
// bucketBlock represents a block that is located in a bucket. It holds intermediate
// state for the block on local disk.
type bucketBlock struct {
logger log.Logger
metrics *bucketStoreMetrics
bkt objstore.BucketReader
meta *metadata.Meta
Expand All @@ -2294,7 +2316,6 @@ type bucketBlock struct {

func newBucketBlock(
ctx context.Context,
logger log.Logger,
metrics *bucketStoreMetrics,
meta *metadata.Meta,
bkt objstore.BucketReader,
Expand All @@ -2319,7 +2340,6 @@ func newBucketBlock(
extLset := labels.FromMap(meta.Thanos.Labels)
relabelLabels := labels.NewBuilder(extLset).Set(block.BlockIDLabel, meta.ULID.String()).Labels()
b = &bucketBlock{
logger: logger,
metrics: metrics,
bkt: bkt,
indexCache: indexCache,
Expand Down Expand Up @@ -2358,12 +2378,12 @@ func (b *bucketBlock) indexFilename() string {
return path.Join(b.meta.ULID.String(), block.IndexFilename)
}

func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]byte, error) {
func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64, logger log.Logger) ([]byte, error) {
r, err := b.bkt.GetRange(ctx, b.indexFilename(), off, length)
if err != nil {
return nil, errors.Wrap(err, "get range reader")
}
defer runutil.CloseWithLogOnErr(b.logger, r, "readIndexRange close range reader")
defer runutil.CloseWithLogOnErr(logger, r, "readIndexRange close range reader")

// Preallocate the buffer with the exact size so we don't waste allocations
// while progressively growing an initial small buffer. The buffer capacity
Expand All @@ -2376,7 +2396,7 @@ func (b *bucketBlock) readIndexRange(ctx context.Context, off, length int64) ([]
return buf.Bytes(), nil
}

func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges) (*[]byte, error) {
func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length int64, chunkRanges byteRanges, logger log.Logger) (*[]byte, error) {
if seq < 0 || seq >= len(b.chunkObjs) {
return nil, errors.Errorf("unknown segment file for index %d", seq)
}
Expand All @@ -2386,7 +2406,7 @@ func (b *bucketBlock) readChunkRange(ctx context.Context, seq int, off, length i
if err != nil {
return nil, errors.Wrap(err, "get range reader")
}
defer runutil.CloseWithLogOnErr(b.logger, reader, "readChunkRange close range reader")
defer runutil.CloseWithLogOnErr(logger, reader, "readChunkRange close range reader")

// Get a buffer from the pool.
chunkBuffer, err := b.chunkPool.Get(chunkRanges.size())
Expand All @@ -2410,14 +2430,14 @@ func (b *bucketBlock) chunkRangeReader(ctx context.Context, seq int, off, length
return b.bkt.GetRange(ctx, b.chunkObjs[seq], off, length)
}

func (b *bucketBlock) indexReader() *bucketIndexReader {
func (b *bucketBlock) indexReader(logger log.Logger) *bucketIndexReader {
b.pendingReaders.Add(1)
return newBucketIndexReader(b)
return newBucketIndexReader(b, logger)
}

func (b *bucketBlock) chunkReader() *bucketChunkReader {
func (b *bucketBlock) chunkReader(logger log.Logger) *bucketChunkReader {
b.pendingReaders.Add(1)
return newBucketChunkReader(b)
return newBucketChunkReader(b, logger)
}

// matchRelabelLabels verifies whether the block matches the given matchers.
Expand Down Expand Up @@ -2454,16 +2474,18 @@ type bucketIndexReader struct {
loadedSeries map[storage.SeriesRef][]byte

indexVersion int
logger log.Logger
}

func newBucketIndexReader(block *bucketBlock) *bucketIndexReader {
func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader {
r := &bucketIndexReader{
block: block,
dec: &index.Decoder{
LookupSymbol: block.indexHeaderReader.LookupSymbol,
},
stats: &queryStats{},
loadedSeries: map[storage.SeriesRef][]byte{},
logger: logger,
}
return r
}
Expand Down Expand Up @@ -2863,13 +2885,13 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context,
}()
// If failed to decode or expand cached postings, return and expand postings again.
if err != nil {
level.Error(r.block.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
level.Error(r.logger).Log("msg", "failed to decode cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
return false, nil, nil
}

ps, err := ExpandPostingsWithContext(ctx, p)
if err != nil {
level.Error(r.block.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err)
return false, nil, nil
}

Expand Down Expand Up @@ -3009,7 +3031,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
if err != nil {
return errors.Wrap(err, "read postings range")
}
defer runutil.CloseWithLogOnErr(r.block.logger, partReader, "readIndexRange close range reader")
defer runutil.CloseWithLogOnErr(r.logger, partReader, "readIndexRange close range reader")
brdr.Reset(partReader)

rdr := newPostingsReaderBuilder(ctx, brdr, ptrs[i:j], start, length)
Expand Down Expand Up @@ -3194,7 +3216,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
stats.DataDownloadedSizeSum += units.Base2Bytes(end - start)
}

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start), r.logger)
if err != nil {
return errors.Wrap(err, "read series range")
}
Expand All @@ -3218,7 +3240,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series

// Inefficient, but should be rare.
r.block.metrics.seriesRefetches.WithLabelValues(tenant).Inc()
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize)
level.Warn(r.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize)

// Fetch plus to get the size of next one if exists.
return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter, tenant)
Expand Down Expand Up @@ -3408,17 +3430,19 @@ type bucketChunkReader struct {
chunkBytesMtx sync.Mutex
stats *queryStats
chunkBytes []*[]byte // Byte slice to return to the chunk pool on close.
logger log.Logger

loadingChunksMtx sync.Mutex
loadingChunks bool
finishLoadingChks chan struct{}
}

func newBucketChunkReader(block *bucketBlock) *bucketChunkReader {
func newBucketChunkReader(block *bucketBlock, logger log.Logger) *bucketChunkReader {
return &bucketChunkReader{
block: block,
stats: &queryStats{},
toLoad: make([][]loadIdx, len(block.chunkObjs)),
logger: logger,
}
}

Expand Down Expand Up @@ -3525,7 +3549,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
if err != nil {
return errors.Wrap(err, "get range reader")
}
defer runutil.CloseWithLogOnErr(r.block.logger, reader, "readChunkRange close range reader")
defer runutil.CloseWithLogOnErr(r.logger, reader, "readChunkRange close range reader")
bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize)

stats.chunksFetchCount++
Expand Down Expand Up @@ -3605,7 +3629,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
}
stats.DataDownloadedSizeSum += units.Base2Bytes(chunkLen)

nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}})
nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}, r.logger)
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen)
}
Expand Down
Loading
Loading