Skip to content

Commit

Permalink
store: add downloaded bytes limit (thanos-io#5801)
Browse files Browse the repository at this point in the history
* store: add downloaded bytes limit

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* store: add bytesLimiter to LabelNames, LabelValues

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* test: add e2e test for new limiter

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* *: update CHANGELOG/etc

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* e2e: hard fail on no error

We always expect an error here.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

* CHANGELOG: fix & improve clarity

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Oct 31, 2022
1 parent 7174724 commit 5c5c62c
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 28 deletions.
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,16 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

- [#5753](https://github.com/thanos-io/thanos/pull/5753) Build with Go 1.19.
### Fixed

### Added

- [#5814](https://github.com/thanos-io/thanos/pull/5814) - Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits.
- [#5801](https://github.com/thanos-io/thanos/pull/5801) Store: add a new limiter `--store.grpc.downloaded-bytes-limit` that limits the number of bytes downloaded in each Series/LabelNames/LabelValues call. Use `thanos_bucket_store_postings_size_bytes` for determining the limits.

### Changed

## [v0.29.0](https://github.com/thanos-io/thanos/tree/release-0.29) - Release in progress

### Fixed
- [#5642](https://github.com/thanos-io/thanos/pull/5642) Receive: Log labels correctly in writer debug messages.
Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type storeConfig struct {
chunkPoolSize units.Base2Bytes
maxSampleCount uint64
maxTouchedSeriesCount uint64
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
debugLogging bool
Expand Down Expand Up @@ -109,6 +110,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
"Maximum amount of touched series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit.").
Default("0").Uint64Var(&sc.maxTouchedSeriesCount)

cmd.Flag("store.grpc.downloaded-bytes-limit",
"Maximum amount of downloaded (either fetched or touched) bytes in a single Series/LabelNames/LabelValues call. The Series call fails if this limit is exceeded. 0 means no limit.").
Default("0").BytesVar(&sc.maxDownloadedBytes)

cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").IntVar(&sc.maxConcurrency)

sc.component = component.Store
Expand Down Expand Up @@ -345,6 +350,7 @@ func runStore(
conf.dataDir,
store.NewChunksLimiterFactory(conf.maxSampleCount/store.MaxSamplesPerChunk), // The samples limit is an approximation based on the max number of samples per chunk.
store.NewSeriesLimiterFactory(conf.maxTouchedSeriesCount),
store.NewBytesLimiterFactory(conf.maxDownloadedBytes),
store.NewGapBasedPartitioner(store.PartitionerMaxGapSize),
conf.blockSyncConcurrency,
conf.advertiseCompatibilityLabel,
Expand Down
6 changes: 6 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ Flags:
If true, Store Gateway will lazy memory map
index-header only once the block is required by
a query.
--store.grpc.downloaded-bytes-limit=0
Maximum amount of downloaded (either
fetched or touched) bytes in a single
Series/LabelNames/LabelValues call. The Series
call fails if this limit is exceeded. 0 means
no limit.
--store.grpc.series-max-concurrency=20
Maximum number of concurrent Series calls.
--store.grpc.series-sample-limit=0
Expand Down
1 change: 1 addition & 0 deletions internal/cortex/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
u.syncDirForUser(userID),
newChunksLimiterFactory(u.limits, userID),
newSeriesLimiterFactory(u.limits, userID),
store.NewBytesLimiterFactory(0),
u.partitioner,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
Expand Down
73 changes: 59 additions & 14 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,10 @@ type BucketStore struct {
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call,
// or LabelName and LabelValues calls when used with matchers.
seriesLimiterFactory SeriesLimiterFactory
partitioner Partitioner

// bytesLimiterFactory creates a new limiter used to limit the amount of bytes fetched/touched by each Series() call.
bytesLimiterFactory BytesLimiterFactory
partitioner Partitioner

filterConfig *FilterConfig
advLabelSets []labelpb.ZLabelSet
Expand Down Expand Up @@ -420,6 +423,7 @@ func NewBucketStore(
dir string,
chunksLimiterFactory ChunksLimiterFactory,
seriesLimiterFactory SeriesLimiterFactory,
bytesLimiterFactory BytesLimiterFactory,
partitioner Partitioner,
blockSyncConcurrency int,
enableCompatibilityLabel bool,
Expand All @@ -446,6 +450,7 @@ func NewBucketStore(
queryGate: gate.NewNoop(),
chunksLimiterFactory: chunksLimiterFactory,
seriesLimiterFactory: seriesLimiterFactory,
bytesLimiterFactory: bytesLimiterFactory,
partitioner: partitioner,
enableCompatibilityLabel: enableCompatibilityLabel,
postingOffsetsInMemSampling: postingOffsetsInMemSampling,
Expand Down Expand Up @@ -815,14 +820,15 @@ func blockSeries(
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
bytesLimiter BytesLimiter, // Rate limiter for used bytes.
skipChunks bool,
minTime, maxTime int64,
loadAggregates []storepb.Aggr,
shardMatcher *storepb.ShardMatcher,
emptyPostingsCount prometheus.Counter,
calculateChunkHash bool,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(ctx, matchers)
ps, err := indexr.ExpandedPostings(ctx, matchers, bytesLimiter)
if err != nil {
return nil, nil, errors.Wrap(err, "expanded matching posting")
}
Expand All @@ -840,7 +846,7 @@ func blockSeries(
// Preload all series index data.
// TODO(bwplotka): Consider not keeping all series in memory all the time.
// TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method.
if err := indexr.PreloadSeries(ctx, ps); err != nil {
if err := indexr.PreloadSeries(ctx, ps, bytesLimiter); err != nil {
return nil, nil, errors.Wrap(err, "preload series")
}

Expand Down Expand Up @@ -905,7 +911,7 @@ func blockSeries(
return newBucketSeriesSet(res), indexr.stats, nil
}

if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash); err != nil {
if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash, bytesLimiter); err != nil {
return nil, nil, errors.Wrap(err, "load chunks")
}

Expand Down Expand Up @@ -1053,6 +1059,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.MaxTime = s.limitMaxTime(req.MaxTime)

var (
bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))
ctx = srv.Context()
stats = &queryStats{}
res []storepb.SeriesSet
Expand Down Expand Up @@ -1128,6 +1135,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
blockMatchers,
chunksLimiter,
seriesLimiter,
bytesLimiter,
req.SkipChunks,
req.MinTime, req.MaxTime,
req.Aggregates,
Expand Down Expand Up @@ -1292,6 +1300,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
var mtx sync.Mutex
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))

for _, b := range s.blocks {
b := b
Expand Down Expand Up @@ -1350,6 +1359,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
reqSeriesMatchersNoExtLabels,
nil,
seriesLimiter,
bytesLimiter,
true,
req.Start,
req.End,
Expand Down Expand Up @@ -1458,6 +1468,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
var mtx sync.Mutex
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))

for _, b := range s.blocks {
b := b
Expand Down Expand Up @@ -1519,6 +1530,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
reqSeriesMatchersNoExtLabels,
nil,
seriesLimiter,
bytesLimiter,
true,
req.Start,
req.End,
Expand Down Expand Up @@ -1913,7 +1925,7 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader {
// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher) ([]storage.SeriesRef, error) {
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
var (
postingGroups []*postingGroup
allRequested = false
Expand Down Expand Up @@ -1962,7 +1974,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
keys = append(keys, allPostingsLabel)
}

fetchedPostings, err := r.fetchPostings(ctx, keys)
fetchedPostings, err := r.fetchPostings(ctx, keys, bytesLimiter)
if err != nil {
return nil, errors.Wrap(err, "get postings")
}
Expand Down Expand Up @@ -2102,7 +2114,7 @@ type postingPtr struct {
// fetchPostings fill postings requested by posting groups.
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label) ([]index.Postings, error) {
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, error) {
timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration)
defer timer.ObserveDuration()

Expand All @@ -2112,6 +2124,11 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab

// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys)
for _, dataFromCache := range fromCache {
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache")
}
}

// Iterate over all groups and fetch posting from cache.
// If we have a miss, mark key to be fetched in `ptrs` slice.
Expand Down Expand Up @@ -2174,6 +2191,15 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End)
})

for _, part := range parts {
start := int64(part.Start)
length := int64(part.End) - start

if err := bytesLimiter.Reserve(uint64(length)); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings")
}
}

g, ctx := errgroup.WithContext(ctx)
for _, part := range parts {
i, j := part.ElemRng[0], part.ElemRng[1]
Expand Down Expand Up @@ -2320,7 +2346,7 @@ func (it *bigEndianPostings) length() int {
return len(it.list) / 4
}

func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef) error {
func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter) error {
timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration)
defer timer.ObserveDuration()

Expand All @@ -2329,26 +2355,36 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser
fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids)
for id, b := range fromCache {
r.loadedSeries[id] = b
if err := bytesLimiter.Reserve(uint64(len(b))); err != nil {
return errors.Wrap(err, "exceeded bytes limit while loading series from index cache")
}
}

parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) {
return uint64(ids[i]), uint64(ids[i] + maxSeriesSize)
})

g, ctx := errgroup.WithContext(ctx)
for _, p := range parts {
s, e := p.Start, p.End
i, j := p.ElemRng[0], p.ElemRng[1]

g.Go(func() error {
return r.loadSeries(ctx, ids[i:j], false, s, e)
return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter)
})
}
return g.Wait()
}

func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64) error {
func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter) error {
begin := time.Now()

if bytesLimiter != nil {
if err := bytesLimiter.Reserve(uint64(end - start)); err != nil {
return errors.Wrap(err, "exceeded bytes limit while fetching series")
}
}

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
if err != nil {
return errors.Wrap(err, "read series range")
Expand Down Expand Up @@ -2378,7 +2414,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize)

// Fetch plus to get the size of next one if exists.
return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1))
return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter)
}
c = c[n : n+int(l)]
r.mtx.Lock()
Expand Down Expand Up @@ -2598,7 +2634,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int)
}

// load loads all added chunks and saves resulting aggrs to res.
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool) error {
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error {
g, ctx := errgroup.WithContext(ctx)

for seq, pIdxs := range r.toLoad {
Expand All @@ -2609,12 +2645,18 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + EstimatedMaxChunkSize
})

for _, p := range parts {
if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil {
return errors.Wrap(err, "bytes limit exceeded while fetching chunks")
}
}

for _, p := range parts {
seq := seq
p := p
indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]]
g.Go(func() error {
return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum)
return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum, bytesLimiter)
})
}
}
Expand All @@ -2623,7 +2665,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [

// loadChunks will read range [start, end] from the segment file with sequence number seq.
// This data range covers chunks starting at supplied offsets.
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool) error {
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error {
fetchBegin := time.Now()

// Get a reader for the required range.
Expand Down Expand Up @@ -2719,6 +2761,9 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a

// Read entire chunk into new buffer.
// TODO: readChunkRange call could be avoided for any chunk but last in this particular part.
if err := bytesLimiter.Reserve(uint64(chunkLen)); err != nil {
return errors.Wrap(err, "bytes limit exceeded while fetching chunks")
}
nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}})
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen)
Expand Down
Loading

0 comments on commit 5c5c62c

Please sign in to comment.