Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add downloaded bytes limit #5801

Merged
merged 8 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5658](https://github.com/thanos-io/thanos/pull/5658) Query Frontend: Introduce new optional parameters (`query-range.min-split-interval`, `query-range.max-split-interval`, `query-range.horizontal-shards`) to implement more dynamic horizontal query splitting.
- [#5721](https://github.com/thanos-io/thanos/pull/5721) Store: Add metric `thanos_bucket_store_empty_postings_total` for number of empty postings when fetching series.
- [#5723](https://github.com/thanos-io/thanos/pull/5723) Compactor: Support disable block viewer UI.
- [#5801](https://github.com/thanos-io/thanos/pull/5801) Store: add a new limiter `--store.grpc.downloaded-bytes-limit` that limits the number of bytes downloaded in each Series/LabelNames/LabelValues call.
- [#5674](https://github.com/thanos-io/thanos/pull/5674) Query Frontend/Store: Add support connecting to redis using TLS.
- [#5734](https://github.com/thanos-io/thanos/pull/5734) Store: Support disable block viewer UI.
- [#5411](https://github.com/thanos-io/thanos/pull/5411) Tracing: Add OpenTelemetry Protocol exporter.
Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type storeConfig struct {
chunkPoolSize units.Base2Bytes
maxSampleCount uint64
maxTouchedSeriesCount uint64
maxDownloadedBytes units.Base2Bytes
maxConcurrency int
component component.StoreAPI
debugLogging bool
Expand Down Expand Up @@ -109,6 +110,10 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
"Maximum amount of touched series returned via a single Series call. The Series call fails if this limit is exceeded. 0 means no limit.").
Default("0").Uint64Var(&sc.maxTouchedSeriesCount)

cmd.Flag("store.grpc.downloaded-bytes-limit",
"Maximum amount of downloaded (either fetched or touched) bytes in a single Series/LabelNames/LabelValues call. The Series call fails if this limit is exceeded. 0 means no limit.").
Default("0").BytesVar(&sc.maxDownloadedBytes)

cmd.Flag("store.grpc.series-max-concurrency", "Maximum number of concurrent Series calls.").Default("20").IntVar(&sc.maxConcurrency)

sc.component = component.Store
Expand Down Expand Up @@ -332,6 +337,7 @@ func runStore(
store.WithQueryGate(queriesGate),
store.WithChunkPool(chunkPool),
store.WithFilterConfig(conf.filterConf),
store.WithBytesLimiterFactory(store.NewBytesLimiterFactory(conf.maxDownloadedBytes)),
}

if conf.debugLogging {
Expand Down
6 changes: 6 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ Flags:
If true, Store Gateway will lazy memory map
index-header only once the block is required by
a query.
--store.grpc.downloaded-bytes-limit=0
Maximum amount of downloaded (either
fetched or touched) bytes in a single
Series/LabelNames/LabelValues call. The Series
call fails if this limit is exceeded. 0 means
no limit.
--store.grpc.series-max-concurrency=20
Maximum number of concurrent Series calls.
--store.grpc.series-sample-limit=0
Expand Down
87 changes: 73 additions & 14 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,10 @@ type BucketStore struct {
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call,
// or LabelName and LabelValues calls when used with matchers.
seriesLimiterFactory SeriesLimiterFactory
partitioner Partitioner

// bytesLimiterFactory creates a new limiter used to limit the amount of bytes fetched/touched by each Series() call.
bytesLimiterFactory BytesLimiterFactory
partitioner Partitioner

filterConfig *FilterConfig
advLabelSets []labelpb.ZLabelSet
Expand Down Expand Up @@ -382,6 +385,14 @@ func WithFilterConfig(filter *FilterConfig) BucketStoreOption {
}
}

// WithBytesLimiterFactory sets a bytes limiter factory which store uses for limiting bytes downloaded
// in a single Series() call.
func WithBytesLimiterFactory(blf BytesLimiterFactory) BucketStoreOption {
return func(s *BucketStore) {
s.bytesLimiterFactory = blf
}
}

// WithDebugLogging enables debug logging.
func WithDebugLogging() BucketStoreOption {
return func(s *BucketStore) {
Expand Down Expand Up @@ -791,13 +802,14 @@ func blockSeries(
matchers []*labels.Matcher, // Series matchers.
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
bytesLimiter BytesLimiter, // Rate limiter for used bytes.
skipChunks bool, // If true, chunks are not loaded.
minTime, maxTime int64, // Series must have data in this time range to be returned.
loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks.
shardMatcher *storepb.ShardMatcher,
emptyPostingsCount prometheus.Counter,
) (storepb.SeriesSet, *queryStats, error) {
ps, err := indexr.ExpandedPostings(ctx, matchers)
ps, err := indexr.ExpandedPostings(ctx, matchers, bytesLimiter)
if err != nil {
return nil, nil, errors.Wrap(err, "expanded matching posting")
}
Expand All @@ -815,7 +827,7 @@ func blockSeries(
// Preload all series index data.
// TODO(bwplotka): Consider not keeping all series in memory all the time.
// TODO(bwplotka): Do lazy loading in one step as `ExpandingPostings` method.
if err := indexr.PreloadSeries(ctx, ps); err != nil {
if err := indexr.PreloadSeries(ctx, ps, bytesLimiter); err != nil {
return nil, nil, errors.Wrap(err, "preload series")
}

Expand Down Expand Up @@ -880,7 +892,7 @@ func blockSeries(
return newBucketSeriesSet(res), indexr.stats, nil
}

if err := chunkr.load(ctx, res, loadAggregates); err != nil {
if err := chunkr.load(ctx, res, loadAggregates, bytesLimiter); err != nil {
return nil, nil, errors.Wrap(err, "load chunks")
}

Expand Down Expand Up @@ -1015,6 +1027,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.MaxTime = s.limitMaxTime(req.MaxTime)

var (
bytesLimiter BytesLimiter
ctx = srv.Context()
stats = &queryStats{}
res []storepb.SeriesSet
Expand All @@ -1026,6 +1039,10 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
)

if s.bytesLimiterFactory != nil {
bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))
}

if req.Hints != nil {
reqHints := &hintspb.SeriesRequestHints{}
if err := types.UnmarshalAny(req.Hints, reqHints); err != nil {
Expand Down Expand Up @@ -1090,6 +1107,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
blockMatchers,
chunksLimiter,
seriesLimiter,
bytesLimiter,
req.SkipChunks,
req.MinTime, req.MaxTime,
req.Aggregates,
Expand Down Expand Up @@ -1252,6 +1270,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
var mtx sync.Mutex
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))

for _, b := range s.blocks {
b := b
Expand Down Expand Up @@ -1310,6 +1329,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
reqSeriesMatchersNoExtLabels,
nil,
seriesLimiter,
bytesLimiter,
true,
req.Start,
req.End,
Expand Down Expand Up @@ -1417,6 +1437,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
var mtx sync.Mutex
var sets [][]string
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
var bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes"))

for _, b := range s.blocks {
b := b
Expand Down Expand Up @@ -1478,6 +1499,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
reqSeriesMatchersNoExtLabels,
nil,
seriesLimiter,
bytesLimiter,
true,
req.Start,
req.End,
Expand Down Expand Up @@ -1871,7 +1893,7 @@ func newBucketIndexReader(block *bucketBlock) *bucketIndexReader {
// Reminder: A posting is a reference (represented as a uint64) to a series reference, which in turn points to the first
// chunk where the series contains the matching label-value pair for a given block of data. Postings can be fetched by
// single label name=value.
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher) ([]storage.SeriesRef, error) {
func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.Matcher, bytesLimiter BytesLimiter) ([]storage.SeriesRef, error) {
var (
postingGroups []*postingGroup
allRequested = false
Expand Down Expand Up @@ -1920,7 +1942,7 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms []*labels.M
keys = append(keys, allPostingsLabel)
}

fetchedPostings, err := r.fetchPostings(ctx, keys)
fetchedPostings, err := r.fetchPostings(ctx, keys, bytesLimiter)
if err != nil {
return nil, errors.Wrap(err, "get postings")
}
Expand Down Expand Up @@ -2060,7 +2082,7 @@ type postingPtr struct {
// fetchPostings fill postings requested by posting groups.
// It returns one postings for each key, in the same order.
// If postings for given key is not fetched, entry at given index will be nil.
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label) ([]index.Postings, error) {
func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Label, bytesLimiter BytesLimiter) ([]index.Postings, error) {
timer := prometheus.NewTimer(r.block.metrics.postingsFetchDuration)
defer timer.ObserveDuration()

Expand All @@ -2070,6 +2092,11 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab

// Fetch postings from the cache with a single call.
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys)
for _, dataFromCache := range fromCache {
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache")
}
}

// Iterate over all groups and fetch posting from cache.
// If we have a miss, mark key to be fetched in `ptrs` slice.
Expand Down Expand Up @@ -2132,6 +2159,17 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End)
})

if bytesLimiter != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about having a nopLimiter to avoid nil checks?

Copy link
Member Author

@GiedriusS GiedriusS Oct 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it would be cleaner to add checks instead of passing NewBytesLimiterFactory(0)(nil) everywhere 😄 let me do this change

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make the bytesLimiter an interface, then we don't need to pass the factory around right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it is an interface already, no? I don't get your point. A nopLimiter is the same as NewBytesLimiterFactory(0)() so I'm not sure what is the benefit of having nopLimiter. Problem is that you have to pass some kind of limiter in all places in all tests even though it is irrelevant. The purpose of this if is to avoid having to copy/paste that. Whether it is nopLimiter or NewBytesLimiterFactory(0)() it doesn't matter. Either way, let me make this change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought of a better idea - since it will be pretty much necessary to pass something here, I've made it into a "normal" argument to NewBucketStore. With this, it won't be needed to add a bunch of ifs everywhere.

for _, part := range parts {
start := int64(part.Start)
length := int64(part.End) - start

if err := bytesLimiter.Reserve(uint64(length)); err != nil {
return nil, errors.Wrap(err, "bytes limit exceeded while fetching postings")
}
}
}

g, ctx := errgroup.WithContext(ctx)
for _, part := range parts {
i, j := part.ElemRng[0], part.ElemRng[1]
Expand Down Expand Up @@ -2278,7 +2316,7 @@ func (it *bigEndianPostings) length() int {
return len(it.list) / 4
}

func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef) error {
func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.SeriesRef, bytesLimiter BytesLimiter) error {
timer := prometheus.NewTimer(r.block.metrics.seriesFetchDuration)
defer timer.ObserveDuration()

Expand All @@ -2287,26 +2325,36 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser
fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids)
for id, b := range fromCache {
r.loadedSeries[id] = b
if err := bytesLimiter.Reserve(uint64(len(b))); err != nil {
return errors.Wrap(err, "exceeded bytes limit while loading series from index cache")
}
}

parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) {
return uint64(ids[i]), uint64(ids[i] + maxSeriesSize)
})

g, ctx := errgroup.WithContext(ctx)
for _, p := range parts {
s, e := p.Start, p.End
i, j := p.ElemRng[0], p.ElemRng[1]

g.Go(func() error {
return r.loadSeries(ctx, ids[i:j], false, s, e)
return r.loadSeries(ctx, ids[i:j], false, s, e, bytesLimiter)
})
}
return g.Wait()
}

func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64) error {
func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.SeriesRef, refetch bool, start, end uint64, bytesLimiter BytesLimiter) error {
begin := time.Now()

if bytesLimiter != nil {
if err := bytesLimiter.Reserve(uint64(end - start)); err != nil {
return errors.Wrap(err, "exceeded bytes limit while fetching series")
}
}

b, err := r.block.readIndexRange(ctx, int64(start), int64(end-start))
if err != nil {
return errors.Wrap(err, "read series range")
Expand Down Expand Up @@ -2336,7 +2384,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize)

// Fetch plus to get the size of next one if exists.
return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1))
return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter)
}
c = c[n : n+int(l)]
r.mtx.Lock()
Expand Down Expand Up @@ -2556,7 +2604,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int)
}

// load loads all added chunks and saves resulting aggrs to res.
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr) error {
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, bytesLimiter BytesLimiter) error {
g, ctx := errgroup.WithContext(ctx)

for seq, pIdxs := range r.toLoad {
Expand All @@ -2567,12 +2615,20 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + EstimatedMaxChunkSize
})

if bytesLimiter != nil {
for _, p := range parts {
if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil {
return errors.Wrap(err, "bytes limit exceeded while fetching chunks")
}
}
}

for _, p := range parts {
seq := seq
p := p
indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]]
g.Go(func() error {
return r.loadChunks(ctx, res, aggrs, seq, p, indices)
return r.loadChunks(ctx, res, aggrs, seq, p, indices, bytesLimiter)
})
}
}
Expand All @@ -2581,7 +2637,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [

// loadChunks will read range [start, end] from the segment file with sequence number seq.
// This data range covers chunks starting at supplied offsets.
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx) error {
func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, bytesLimiter BytesLimiter) error {
fetchBegin := time.Now()

// Get a reader for the required range.
Expand Down Expand Up @@ -2677,6 +2733,9 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a

// Read entire chunk into new buffer.
// TODO: readChunkRange call could be avoided for any chunk but last in this particular part.
if err := bytesLimiter.Reserve(uint64(chunkLen)); err != nil {
return errors.Wrap(err, "bytes limit exceeded while fetching chunks")
}
nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}})
if err != nil {
return errors.Wrapf(err, "preloaded chunk too small, expecting %d, and failed to fetch full chunk", chunkLen)
Expand Down
Loading