Skip to content

Commit

Permalink
Deprecate cache interval (#2040)
Browse files Browse the repository at this point in the history
* deprecates frontend.cache-split-interval in favor of querier.split-queries-by-interval

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* regens docs, updates changelog
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* adds PR #

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* removes frontend.cache-split-interval
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* regens docs
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* addresses pr comments, config no longer satisfies CacheSplitter ifc

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* passes logger to config validation to log warning/mutation
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* uses day const in tests
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>

* consistent renaming for constSplitter tests
Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
owen-d authored Jan 30, 2020
1 parent b66e5b7 commit 1462d45
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [CHANGE] Removed unnecessary `frontend.cache-split-interval` in favor of `querier.split-queries-by-interval` both to reduce configuration complexity and guarantee alignment of these two configs. Starting from now, `-querier.cache-results` may only be enabled in conjunction with `-querier.split-queries-by-interval` (previously the cache interval default was `24h` so if you want to preserve the same behaviour you should set `-querier.split-queries-by-interval=24h`). #2040
* [CHANGE] Removed remaining support for using denormalised tokens in the ring. If you're still running ingesters with denormalised tokens (Cortex 0.4 or earlier, with `-ingester.normalise-tokens=false`), such ingesters will now be completely invisible to distributors and need to be either switched to Cortex 0.6.0 or later, or be configured to use normalised tokens. #2034
* [CHANGE] Moved `--store.min-chunk-age` to the Querier config as `--querier.query-store-after`, allowing the store to be skipped during query time if the metrics wouldn't be found. The YAML config option `ingestermaxquerylookback` has been renamed to `query_ingesters_within` to match its CLI flag. #1893
* `--store.min-chunk-age` has been removed
Expand Down
4 changes: 2 additions & 2 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ func main() {
runtime.SetMutexProfileFraction(mutexProfileFraction)
}

util.InitLogger(&cfg.Server)
// Validate the config once both the config file has been loaded
// and CLI flags parsed.
err := cfg.Validate()
err := cfg.Validate(util.Logger)
if err != nil {
fmt.Printf("error validating config: %v\n", err)
os.Exit(1)
Expand All @@ -84,7 +85,6 @@ func main() {
// Allocate a block of memory to alter GC behaviour. See https://github.com/golang/go/issues/23044
ballast := make([]byte, ballastBytes)

util.InitLogger(&cfg.Server)
util.InitEvents(eventSampleRate)

// Setting the environment variable JAEGER_AGENT_HOST enables tracing
Expand Down
8 changes: 2 additions & 6 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,8 @@ The `queryrange_config` configures the query splitting and caching in the Cortex
```yaml
# Split queries by an interval and execute in parallel, 0 disables it. You
# should use an a multiple of 24 hours (same as the storage bucketing scheme),
# to avoid queriers downloading and processing the same chunks.
# to avoid queriers downloading and processing the same chunks. This also
# determines how cache keys are chosen when result caching is enabled
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 0s]
Expand Down Expand Up @@ -604,11 +605,6 @@ results_cache:
# CLI flag: -frontend.max-cache-freshness
[max_freshness: <duration> | default = 1m0s]
# The maximum interval expected for each request, results will be cached per
# single interval.
# CLI flag: -frontend.cache-split-interval
[cache_split_interval: <duration> | default = 24h0m0s]
# Cache query results.
# CLI flag: -querier.cache-results
[cache_results: <boolean> | default = false]
Expand Down
6 changes: 5 additions & 1 deletion pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/weaveworks/common/middleware"
Expand Down Expand Up @@ -125,7 +126,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {

// Validate the cortex config and returns an error if the validation
// doesn't pass
func (c *Config) Validate() error {
func (c *Config) Validate(log log.Logger) error {
if err := c.Schema.Validate(); err != nil {
return errors.Wrap(err, "invalid schema config")
}
Expand All @@ -147,6 +148,9 @@ func (c *Config) Validate() error {
if err := c.Querier.Validate(); err != nil {
return errors.Wrap(err, "invalid querier config")
}
if err := c.QueryRange.Validate(log); err != nil {
return errors.Wrap(err, "invalid queryrange config")
}
return nil
}

Expand Down
50 changes: 35 additions & 15 deletions pkg/querier/queryrange/results_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
)

Expand All @@ -31,14 +32,15 @@ var (
type ResultsCacheConfig struct {
CacheConfig cache.Config `yaml:"cache"`
MaxCacheFreshness time.Duration `yaml:"max_freshness"`
SplitInterval time.Duration `yaml:"cache_split_interval"`
}

// RegisterFlags registers flags.
func (cfg *ResultsCacheConfig) RegisterFlags(f *flag.FlagSet) {
cfg.CacheConfig.RegisterFlagsWithPrefix("frontend.", "", f)

flagext.DeprecatedFlag(f, "frontend.cache-split-interval", "Deprecated: The maximum interval expected for each request, results will be cached per single interval. This behavior is now determined by querier.split-queries-by-interval.")

f.DurationVar(&cfg.MaxCacheFreshness, "frontend.max-cache-freshness", 1*time.Minute, "Most recent allowed cacheable result, to prevent caching very recent results that might still be in flux.")
f.DurationVar(&cfg.SplitInterval, "frontend.cache-split-interval", 24*time.Hour, "The maximum interval expected for each request, results will be cached per single interval.")
}

// Extractor is used by the cache to extract a subset of a response from a cache entry.
Expand All @@ -55,6 +57,21 @@ func (e ExtractorFunc) Extract(start, end int64, from Response) Response {
return e(start, end, from)
}

// CacheSplitter generates cache keys. This is a useful interface for downstream
// consumers who wish to implement their own strategies.
type CacheSplitter interface {
GenerateCacheKey(userID string, r Request) string
}

// constSplitter is a utility for using a constant split interval when determining cache keys
type constSplitter time.Duration

// GenerateCacheKey generates a cache key based on the userID, Request and interval.
func (t constSplitter) GenerateCacheKey(userID string, r Request) string {
currentInterval := r.GetStart() / int64(time.Duration(t)/time.Millisecond)
return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)
}

// PrometheusResponseExtractor is an `Extractor` for a Prometheus query range response.
var PrometheusResponseExtractor = ExtractorFunc(func(start, end int64, from Response) Response {
promRes := from.(*PrometheusResponse)
Expand All @@ -69,11 +86,12 @@ var PrometheusResponseExtractor = ExtractorFunc(func(start, end int64, from Resp
})

type resultsCache struct {
logger log.Logger
cfg ResultsCacheConfig
next Handler
cache cache.Cache
limits Limits
logger log.Logger
cfg ResultsCacheConfig
next Handler
cache cache.Cache
limits Limits
splitter CacheSplitter

extractor Extractor
merger Merger
Expand All @@ -85,7 +103,14 @@ type resultsCache struct {
// Each request starting from within the same interval will hit the same cache entry.
// If the cache doesn't have the entire duration of the request cached, it will query the uncached parts and append them to the cache entries.
// see `generateKey`.
func NewResultsCacheMiddleware(logger log.Logger, cfg ResultsCacheConfig, limits Limits, merger Merger, extractor Extractor) (Middleware, cache.Cache, error) {
func NewResultsCacheMiddleware(
logger log.Logger,
cfg ResultsCacheConfig,
splitter CacheSplitter,
limits Limits,
merger Merger,
extractor Extractor,
) (Middleware, cache.Cache, error) {
c, err := cache.New(cfg.CacheConfig)
if err != nil {
return nil, nil, err
Expand All @@ -100,6 +125,7 @@ func NewResultsCacheMiddleware(logger log.Logger, cfg ResultsCacheConfig, limits
limits: limits,
merger: merger,
extractor: extractor,
splitter: splitter,
}
}), c, nil
}
Expand All @@ -111,7 +137,7 @@ func (s resultsCache) Do(ctx context.Context, r Request) (Response, error) {
}

var (
key = generateKey(userID, r, s.cfg.SplitInterval)
key = s.splitter.GenerateCacheKey(userID, r)
extents []Extent
response Response
)
Expand Down Expand Up @@ -359,12 +385,6 @@ func (s resultsCache) filterRecentExtents(req Request, extents []Extent) ([]Exte
return extents, nil
}

// generateKey generates a cache key based on the userID, Request and interval.
func generateKey(userID string, r Request, interval time.Duration) string {
currentInterval := r.GetStart() / int64(interval/time.Millisecond)
return fmt.Sprintf("%s:%s:%d:%d", userID, r.GetQuery(), r.GetStep(), currentInterval)
}

func (s resultsCache) get(ctx context.Context, key string) ([]Extent, bool) {
found, bufs, _ := s.cache.Fetch(ctx, []string{cache.HashKey(key)})
if len(found) != 1 {
Expand Down
32 changes: 17 additions & 15 deletions pkg/querier/queryrange/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,15 @@ func (fakeLimits) MaxQueryParallelism(string) int {

func TestResultsCache(t *testing.T) {
calls := 0
cfg := ResultsCacheConfig{
CacheConfig: cache.Config{
Cache: cache.NewMockCache(),
},
}
rcm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
ResultsCacheConfig{
CacheConfig: cache.Config{
Cache: cache.NewMockCache(),
},
SplitInterval: 24 * time.Hour,
},
cfg,
constSplitter(day),
fakeLimits{},
PrometheusCodec,
PrometheusResponseExtractor,
Expand Down Expand Up @@ -307,7 +308,7 @@ func TestResultsCacheRecent(t *testing.T) {
var cfg ResultsCacheConfig
flagext.DefaultValues(&cfg)
cfg.CacheConfig.Cache = cache.NewMockCache()
rcm, _, err := NewResultsCacheMiddleware(log.NewNopLogger(), cfg, fakeLimits{}, PrometheusCodec, PrometheusResponseExtractor)
rcm, _, err := NewResultsCacheMiddleware(log.NewNopLogger(), cfg, constSplitter(day), fakeLimits{}, PrometheusCodec, PrometheusResponseExtractor)
require.NoError(t, err)

req := parsedRequest.WithStartEnd(int64(model.Now())-(60*1e3), int64(model.Now()))
Expand All @@ -334,14 +335,15 @@ func TestResultsCacheRecent(t *testing.T) {
}

func Test_resultsCache_MissingData(t *testing.T) {
cfg := ResultsCacheConfig{
CacheConfig: cache.Config{
Cache: cache.NewMockCache(),
},
}
rm, _, err := NewResultsCacheMiddleware(
log.NewNopLogger(),
ResultsCacheConfig{
CacheConfig: cache.Config{
Cache: cache.NewMockCache(),
},
SplitInterval: 24 * time.Hour,
},
cfg,
constSplitter(day),
fakeLimits{},
PrometheusCodec,
PrometheusResponseExtractor,
Expand Down Expand Up @@ -376,7 +378,7 @@ func Test_resultsCache_MissingData(t *testing.T) {
require.False(t, hit)
}

func Test_generateKey(t *testing.T) {
func TestConstSplitter_generateCacheKey(t *testing.T) {
t.Parallel()

tests := []struct {
Expand All @@ -396,7 +398,7 @@ func Test_generateKey(t *testing.T) {
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%s - %s", tt.name, tt.interval), func(t *testing.T) {
if got := generateKey("fake", tt.r, tt.interval); got != tt.want {
if got := constSplitter(tt.interval).GenerateCacheKey("fake", tt.r); got != tt.want {
t.Errorf("generateKey() = %v, want %v", got, tt.want)
}
})
Expand Down
23 changes: 16 additions & 7 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package queryrange

import (
"context"
"errors"
"flag"
"net/http"
"strings"
Expand Down Expand Up @@ -46,12 +47,25 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRetries, "querier.max-retries-per-request", 5, "Maximum number of retries for a single request; beyond this, the downstream error is returned.")
f.BoolVar(&cfg.SplitQueriesByDay, "querier.split-queries-by-day", false, "Deprecated: Split queries by day and execute in parallel.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use an a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks.")
f.DurationVar(&cfg.SplitQueriesByInterval, "querier.split-queries-by-interval", 0, "Split queries by an interval and execute in parallel, 0 disables it. You should use an a multiple of 24 hours (same as the storage bucketing scheme), to avoid queriers downloading and processing the same chunks. This also determines how cache keys are chosen when result caching is enabled")
f.BoolVar(&cfg.AlignQueriesWithStep, "querier.align-querier-with-step", false, "Mutate incoming queries to align their start and end with their step.")
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
cfg.ResultsCacheConfig.RegisterFlags(f)
}

func (cfg *Config) Validate(log log.Logger) error {
// SplitQueriesByDay is deprecated use SplitQueriesByInterval.
if cfg.SplitQueriesByDay {
cfg.SplitQueriesByInterval = day
level.Warn(log).Log("msg", "flag querier.split-queries-by-day (or config split_queries_by_day) is deprecated, use querier.split-queries-by-interval instead.")
}

if cfg.CacheResults && cfg.SplitQueriesByInterval <= 0 {
return errors.New("querier.cache-results may only be enabled in conjunction with querier.split-queries-by-interval. Please set the latter")
}
return nil
}

// HandlerFunc is like http.HandlerFunc, but for Handler.
type HandlerFunc func(context.Context, Request) (Response, error)

Expand Down Expand Up @@ -95,17 +109,12 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, codec Codec, cach
if cfg.AlignQueriesWithStep {
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("step_align"), StepAlignMiddleware)
}
// SplitQueriesByDay is deprecated use SplitQueriesByInterval.
if cfg.SplitQueriesByDay {
level.Warn(log).Log("msg", "flag querier.split-queries-by-day (or config split_queries_by_day) is deprecated, use querier.split-queries-by-interval instead.")
cfg.SplitQueriesByInterval = day
}
if cfg.SplitQueriesByInterval != 0 {
queryRangeMiddleware = append(queryRangeMiddleware, InstrumentMiddleware("split_by_interval"), SplitByIntervalMiddleware(cfg.SplitQueriesByInterval, limits, codec))
}
var c cache.Cache
if cfg.CacheResults {
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, limits, codec, cacheExtractor)
queryCacheMiddleware, cache, err := NewResultsCacheMiddleware(log, cfg.ResultsCacheConfig, constSplitter(cfg.SplitQueriesByInterval), limits, codec, cacheExtractor)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit 1462d45

Please sign in to comment.