@@ -23,11 +23,13 @@ import (
2323 "github.com/thanos-io/thanos/pkg/strutil"
2424 "golang.org/x/sync/errgroup"
2525
26+ "github.com/cortexproject/cortex/pkg/cortexpb"
2627 "github.com/cortexproject/cortex/pkg/storage/bucket"
2728 cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2829 "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2930 "github.com/cortexproject/cortex/pkg/tenant"
3031 "github.com/cortexproject/cortex/pkg/util"
32+ "github.com/cortexproject/cortex/pkg/util/limiter"
3133 util_log "github.com/cortexproject/cortex/pkg/util/log"
3234 "github.com/cortexproject/cortex/pkg/util/multierror"
3335 "github.com/cortexproject/cortex/pkg/util/services"
@@ -132,6 +134,62 @@ func NewParquetQueryable(
132134
133135 cDecoder := schema .NewPrometheusParquetChunksDecoder (chunkenc .NewPool ())
134136
137+ parquetQueryableOpts := []queryable.QueryableOpts {
138+ queryable .WithRowCountLimitFunc (func (ctx context.Context ) int64 {
139+ // Ignore error as this shouldn't happen.
140+ // If failed to resolve tenant we will just use the default limit value.
141+ userID , _ := tenant .TenantID (ctx )
142+ return int64 (limits .ParquetMaxFetchedRowCount (userID ))
143+ }),
144+ queryable .WithChunkBytesLimitFunc (func (ctx context.Context ) int64 {
145+ // Ignore error as this shouldn't happen.
146+ // If failed to resolve tenant we will just use the default limit value.
147+ userID , _ := tenant .TenantID (ctx )
148+ return int64 (limits .ParquetMaxFetchedChunkBytes (userID ))
149+ }),
150+ queryable .WithDataBytesLimitFunc (func (ctx context.Context ) int64 {
151+ // Ignore error as this shouldn't happen.
152+ // If failed to resolve tenant we will just use the default limit value.
153+ userID , _ := tenant .TenantID (ctx )
154+ return int64 (limits .ParquetMaxFetchedDataBytes (userID ))
155+ }),
156+ queryable .WithMaterializedSeriesCallback (func (ctx context.Context , cs []storage.ChunkSeries ) error {
157+ queryLimiter := limiter .QueryLimiterFromContextWithFallback (ctx )
158+ lbls := make ([][]cortexpb.LabelAdapter , 0 , len (cs ))
159+ for _ , series := range cs {
160+ chkCount := 0
161+ chunkSize := 0
162+ lblSize := 0
163+ lblAdapter := cortexpb .FromLabelsToLabelAdapters (series .Labels ())
164+ lbls = append (lbls , lblAdapter )
165+ for _ , lbl := range lblAdapter {
166+ lblSize += lbl .Size ()
167+ }
168+ iter := series .Iterator (nil )
169+ for iter .Next () {
170+ chk := iter .At ()
171+ chunkSize += len (chk .Chunk .Bytes ())
172+ chkCount ++
173+ }
174+ if chkCount > 0 {
175+ if err := queryLimiter .AddChunks (chkCount ); err != nil {
176+ return validation .LimitError (err .Error ())
177+ }
178+ if err := queryLimiter .AddChunkBytes (chunkSize ); err != nil {
179+ return validation .LimitError (err .Error ())
180+ }
181+ }
182+
183+ if err := queryLimiter .AddDataBytes (chunkSize + lblSize ); err != nil {
184+ return validation .LimitError (err .Error ())
185+ }
186+ }
187+ if err := queryLimiter .AddSeries (lbls ... ); err != nil {
188+ return validation .LimitError (err .Error ())
189+ }
190+ return nil
191+ }),
192+ }
135193 parquetQueryable , err := queryable .NewParquetQueryable (cDecoder , func (ctx context.Context , mint , maxt int64 ) ([]parquet_storage.ParquetShard , error ) {
136194 userID , err := tenant .TenantID (ctx )
137195 if err != nil {
@@ -182,7 +240,7 @@ func NewParquetQueryable(
182240 }
183241
184242 return shards , errGroup .Wait ()
185- })
243+ }, parquetQueryableOpts ... )
186244
187245 p := & parquetQueryableWithFallback {
188246 subservices : manager ,
@@ -376,7 +434,7 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
376434
377435 userID , err := tenant .TenantID (ctx )
378436 if err != nil {
379- storage .ErrSeriesSet (err )
437+ return storage .ErrSeriesSet (err )
380438 }
381439
382440 if q .limits .QueryVerticalShardSize (userID ) > 1 {
0 commit comments