Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic splitting by interval for range queries #6458

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4193,6 +4193,23 @@ The `query_range_config` configures the query splitting and caching in the Corte
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 0s]

dynamic_query_splits:
afhassan marked this conversation as resolved.
Show resolved Hide resolved
# [EXPERIMENTAL] Maximum number of shards for a query, 0 disables it.
# Dynamically uses a multiple of `split-queries-by-interval` to maintain the
# number of splits below the limit. If vertical sharding is enabled for a
# query, the combined total number of vertical and interval shards is kept
# below this limit.
# CLI flag: -querier.max-shards-per-query
[max_shards_per_query: <int> | default = 0]

# [EXPERIMENTAL] Max total duration of data fetched by all query shards from
# storage, 0 disables it. Dynamically uses a multiple of
# `split-queries-by-interval` to ensure the total fetched duration of data is
afhassan marked this conversation as resolved.
Show resolved Hide resolved
# lower than the value set. It takes into account additional data fetched by
# matrix selectors and subqueries.
# CLI flag: -querier.max-duration-of-data-fetched-from-storage-per-query
[max_duration_of_data_fetched_from_storage_per_query: <duration> | default = 0s]
afhassan marked this conversation as resolved.
Show resolved Hide resolved

# Mutate incoming queries to align their start and end with their step.
# CLI flag: -querier.align-querier-with-step
[align_queries_with_step: <boolean> | default = false]
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,6 @@ Currently experimental features are:
- Enable string interning for metrics labels by setting `-ingester.labels-string-interning-enabled` on Ingester.
- Query-frontend: query rejection (`-frontend.query-rejection.enabled`)
- Querier: protobuf codec (`-api.querier-default-codec`)
- Query-frontend: dynamic query splits
- `querier.max-shards-per-query` (int) CLI flag
afhassan marked this conversation as resolved.
Show resolved Hide resolved
- `querier.max-duration-of-data-fetched-from-storage-per-query` (duration) CLI flag
5 changes: 5 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
splitQueries := stats.LoadSplitQueries()
dataSelectMaxTime := stats.LoadDataSelectMaxTime()
dataSelectMinTime := stats.LoadDataSelectMinTime()
splitInterval := stats.LoadSplitInterval()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
Expand Down Expand Up @@ -425,6 +426,10 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
logMessage = append(logMessage, "query_storage_wall_time_seconds", sws)
}

if splitInterval > 0 {
logMessage = append(logMessage, "split_interval", splitInterval.String())
}

if error != nil {
s, ok := status.FromError(error)
if !ok {
Expand Down
9 changes: 9 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type QueryStats struct {
Priority int64
DataSelectMaxTime int64
DataSelectMinTime int64
SplitInterval time.Duration
m sync.Mutex
}

Expand Down Expand Up @@ -287,6 +288,14 @@ func (s *QueryStats) LoadDataSelectMinTime() int64 {
return atomic.LoadInt64(&s.DataSelectMinTime)
}

func (s *QueryStats) LoadSplitInterval() time.Duration {
if s == nil {
return 0
}

return s.SplitInterval
}

func (s *QueryStats) AddStoreGatewayTouchedPostings(count uint64) {
if s == nil {
return
Expand Down
9 changes: 5 additions & 4 deletions pkg/querier/tripperware/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,10 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
}

type mockLimits struct {
maxQueryLookback time.Duration
maxQueryLength time.Duration
maxCacheFreshness time.Duration
maxQueryLookback time.Duration
maxQueryLength time.Duration
maxCacheFreshness time.Duration
queryVerticalShardSize int
}

func (m mockLimits) MaxQueryLookback(string) time.Duration {
Expand All @@ -255,7 +256,7 @@ func (m mockLimits) MaxCacheFreshness(string) time.Duration {
}

func (m mockLimits) QueryVerticalShardSize(userID string) int {
return 0
return m.queryVerticalShardSize
}

func (m mockLimits) QueryPriority(userID string) validation.QueryPriority {
Expand Down
37 changes: 30 additions & 7 deletions pkg/querier/tripperware/queryrange/query_range_middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ const day = 24 * time.Hour

// Config for query_range middleware chain.
type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// Query splits config
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
DynamicQuerySplitsConfig DynamicQuerySplitsConfig `yaml:"dynamic_query_splits"`

AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`

Expand All @@ -54,6 +57,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
cfg.ResultsCacheConfig.RegisterFlags(f)
cfg.DynamicQuerySplitsConfig.RegisterFlags(f)
}

// Validate validates the config.
Expand All @@ -66,9 +70,25 @@ func (cfg *Config) Validate(qCfg querier.Config) error {
return errors.Wrap(err, "invalid ResultsCache config")
}
}
if cfg.DynamicQuerySplitsConfig.MaxSplitsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxFetchedStorageDataDurationPerQuery > 0 {
if cfg.SplitQueriesByInterval <= 0 {
return errors.New("configs under dynamic-query-splits requires that a value for split-queries-by-interval is set.")
}
}
return nil
}

type DynamicQuerySplitsConfig struct {
MaxSplitsPerQuery int `yaml:"max_splits_per_query"`
MaxFetchedStorageDataDurationPerQuery time.Duration `yaml:"max_fetched_storage_data_duration_per_query"`
}

// RegisterFlags registers flags foy dynamic query splits
func (cfg *DynamicQuerySplitsConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxSplitsPerQuery, "querier.max-splits-per-query", 0, "[EXPERIMENTAL] Maximum number of splits for a query, 0 disables it. Dynamically uses a multiple of split interval to maintain a total number of splits below the set value. If vertical sharding is enabled for a query, the combined total number of vertical and interval splits is kept below this value.")
f.DurationVar(&cfg.MaxFetchedStorageDataDurationPerQuery, "querier.max-fetched-storage-data-duration-per-query", 0, "[EXPERIMENTAL] Max total duration of data fetched from storage by all query splits, 0 disables it. Dynamically uses a multiple of split interval to maintain a total fetched duration of data lower than the value set. It takes into account additional duration fetched by matrix selectors and subqueries.")
}

// Middlewares returns list of middlewares that should be applied for range query.
func Middlewares(
cfg Config,
Expand All @@ -89,8 +109,11 @@ func Middlewares(
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("step_align", metrics), StepAlignMiddleware)
}
if cfg.SplitQueriesByInterval != 0 {
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer))
intervalFn := staticIntervalFn(cfg)
if cfg.DynamicQuerySplitsConfig.MaxSplitsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxFetchedStorageDataDurationPerQuery > 0 {
intervalFn = dynamicIntervalFn(cfg, limits, queryAnalyzer, lookbackDelta)
}
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(intervalFn, limits, prometheusCodec, registerer, lookbackDelta))
}

var c cache.Cache
Expand Down
Loading
Loading