diff --git a/CHANGELOG.md b/CHANGELOG.md index b3077ca6ef..0364a3c0ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6984](https://github.com/thanos-io/thanos/pull/6984) Store Gateway: Added `--store.index-header-lazy-download-strategy` to specify how to lazily download index headers when lazy mmap is enabled. - [#6887](https://github.com/thanos-io/thanos/pull/6887) Query Frontend: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label. +- [#7028](https://github.com/thanos-io/thanos/pull/7028) Query|Query Frontend: Add new `--query-frontend.enable-x-functions` flag to enable experimental extended functions. ### Changed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 82f04f67d1..ce6adde354 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -102,7 +102,7 @@ func registerQuery(app *extkingpin.App) { defaultEngine := cmd.Flag("query.promql-engine", "Default PromQL engine to use.").Default(string(apiv1.PromqlEnginePrometheus)). Enum(string(apiv1.PromqlEnginePrometheus), string(apiv1.PromqlEngineThanos)) - + extendedFunctionsEnabled := cmd.Flag("query.enable-x-functions", "Whether to enable extended rate functions (xrate, xincrease and xdelta). Only has effect when used with Thanos engine.").Default("false").Bool() promqlQueryMode := cmd.Flag("query.mode", "PromQL query mode. One of: local, distributed."). Hidden(). Default(string(queryModeLocal)). @@ -342,6 +342,7 @@ func registerQuery(app *extkingpin.App) { *queryTelemetrySeriesQuantiles, *defaultEngine, storeRateLimits, + *extendedFunctionsEnabled, queryMode(*promqlQueryMode), *tenantHeader, *defaultTenant, @@ -421,6 +422,7 @@ func runQuery( queryTelemetrySeriesQuantiles []float64, defaultEngine string, storeRateLimits store.SeriesSelectLimits, + extendedFunctionsEnabled bool, queryMode queryMode, tenantHeader string, defaultTenant string, @@ -652,6 +654,7 @@ func runQuery( engineFactory := apiv1.NewQueryEngineFactory( engineOpts, remoteEngineEndpoints, + extendedFunctionsEnabled, ) lookbackDeltaCreator := LookbackDeltaFactory(engineOpts, dynamicLookbackDelta) diff --git a/cmd/thanos/query_frontend.go b/cmd/thanos/query_frontend.go index 9059288cdf..5fa7cf3c5e 100644 --- a/cmd/thanos/query_frontend.go +++ b/cmd/thanos/query_frontend.go @@ -16,6 +16,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/promql/parser" + "github.com/thanos-io/promql-engine/execution/parse" "github.com/weaveworks/common/user" "gopkg.in/yaml.v2" @@ -92,6 +94,9 @@ func registerQueryFrontend(app *extkingpin.App) { cmd.Flag("query-range.max-retries-per-request", "Maximum number of retries for a single query range request; beyond this, the downstream error is returned."). Default("5").IntVar(&cfg.QueryRangeConfig.MaxRetries) + cmd.Flag("query-frontend.enable-x-functions", "Enable experimental x- functions in query-frontend. --no-query-frontend.enable-x-functions for disabling."). + Default("false").BoolVar(&cfg.EnableXFunctions) + cmd.Flag("query-range.max-query-length", "Limit the query time range (end - start time) in the query-frontend, 0 disables it."). Default("0").DurationVar((*time.Duration)(&cfg.QueryRangeConfig.Limits.MaxQueryLength)) @@ -285,6 +290,12 @@ func runQueryFrontend( return errors.Wrap(err, "error validating the config") } + if cfg.EnableXFunctions { + for fname, v := range parse.XFunctions { + parser.Functions[fname] = v + } + } + tripperWare, err := queryfrontend.NewTripperware(cfg.Config, reg, logger) if err != nil { return errors.Wrap(err, "setup tripperwares") diff --git a/docs/components/query-frontend.md b/docs/components/query-frontend.md index a6fde2ba77..e374f19ec1 100644 --- a/docs/components/query-frontend.md +++ b/docs/components/query-frontend.md @@ -252,6 +252,11 @@ Flags: --query-frontend.downstream-url="http://localhost:9090" URL of downstream Prometheus Query compatible API. + --query-frontend.enable-x-functions + Enable experimental x- + functions in query-frontend. + --no-query-frontend.enable-x-functions for + disabling. --query-frontend.forward-header= ... List of headers forwarded by the query-frontend to downstream queriers, default is empty diff --git a/docs/components/query.md b/docs/components/query.md index 2c4f473464..677f0b8a0a 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -363,6 +363,10 @@ Flags: --query.default-tenant-id="default-tenant" Default tenant ID to use if tenant header is not present + --query.enable-x-functions + Whether to enable extended rate functions + (xrate, xincrease and xdelta). Only has effect + when used with Thanos engine. --query.lookback-delta=QUERY.LOOKBACK-DELTA The maximum lookback duration for retrieving metrics during expression evaluations. diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 9f654b549b..3939aeb0ba 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -99,6 +99,7 @@ type QueryEngineFactory struct { createThanosEngine sync.Once thanosEngine v1.QueryEngine + enableXFunctions bool } func (f *QueryEngineFactory) GetPrometheusEngine() v1.QueryEngine { @@ -118,7 +119,7 @@ func (f *QueryEngineFactory) GetThanosEngine() v1.QueryEngine { return } if f.remoteEngineEndpoints == nil { - f.thanosEngine = engine.New(engine.Opts{EngineOpts: f.engineOpts, Engine: f.GetPrometheusEngine(), EnableAnalysis: true}) + f.thanosEngine = engine.New(engine.Opts{EngineOpts: f.engineOpts, Engine: f.GetPrometheusEngine(), EnableAnalysis: true, EnableXFunctions: f.enableXFunctions}) } else { f.thanosEngine = engine.NewDistributedEngine(engine.Opts{EngineOpts: f.engineOpts, Engine: f.GetPrometheusEngine(), EnableAnalysis: true}, f.remoteEngineEndpoints) } @@ -127,13 +128,11 @@ func (f *QueryEngineFactory) GetThanosEngine() v1.QueryEngine { return f.thanosEngine } -func NewQueryEngineFactory( - engineOpts promql.EngineOpts, - remoteEngineEndpoints promqlapi.RemoteEndpoints, -) *QueryEngineFactory { +func NewQueryEngineFactory(engineOpts promql.EngineOpts, remoteEngineEndpoints promqlapi.RemoteEndpoints, enableExtendedFunctions bool) *QueryEngineFactory { return &QueryEngineFactory{ engineOpts: engineOpts, remoteEngineEndpoints: remoteEngineEndpoints, + enableXFunctions: enableExtendedFunctions, } } diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 37195fb6bb..ef119bfa19 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -190,7 +190,7 @@ func TestQueryEndpoints(t *testing.T) { Reg: nil, MaxSamples: 10000, Timeout: timeout, - }, nil) + }, nil, false) api := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ Now: func() time.Time { return now }, @@ -643,7 +643,7 @@ func TestQueryExplainEndpoints(t *testing.T) { Reg: nil, MaxSamples: 10000, Timeout: timeout, - }, nil) + }, nil, false) api := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ Now: func() time.Time { return now }, @@ -707,7 +707,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) { Reg: nil, MaxSamples: 10000, Timeout: timeout, - }, nil) + }, nil, false) api := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ Now: func() time.Time { return now }, @@ -881,7 +881,7 @@ func TestMetadataEndpoints(t *testing.T) { Reg: nil, MaxSamples: 10000, Timeout: timeout, - }, nil) + }, nil, false) api := &QueryAPI{ baseAPI: &baseAPI.BaseAPI{ Now: func() time.Time { return now }, diff --git a/pkg/queryfrontend/config.go b/pkg/queryfrontend/config.go index 4b915764c5..176052bd33 100644 --- a/pkg/queryfrontend/config.go +++ b/pkg/queryfrontend/config.go @@ -209,6 +209,7 @@ type Config struct { TenantHeader string DefaultTenant string TenantCertField string + EnableXFunctions bool } // QueryRangeConfig holds the config for query range tripperware. diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 73f6b68925..7eaff47b40 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -29,6 +29,7 @@ import ( "github.com/thanos-io/objstore/exthttp" "github.com/thanos-io/thanos/pkg/alert" + apiv1 "github.com/thanos-io/thanos/pkg/api/query" "github.com/thanos-io/thanos/pkg/clientconfig" "github.com/thanos-io/thanos/pkg/queryfrontend" "github.com/thanos-io/thanos/pkg/receive" @@ -253,8 +254,9 @@ type QuerierBuilder struct { endpoints []string strictEndpoints []string - engine string - queryMode string + engine apiv1.PromqlEngineType + queryMode string + enableXFunctions bool replicaLabels []string tracingConfig string @@ -362,7 +364,7 @@ func (q *QuerierBuilder) WithDisablePartialResponses(disable bool) *QuerierBuild return q } -func (q *QuerierBuilder) WithEngine(engine string) *QuerierBuilder { +func (q *QuerierBuilder) WithEngine(engine apiv1.PromqlEngineType) *QuerierBuilder { q.engine = engine return q } @@ -372,6 +374,11 @@ func (q *QuerierBuilder) WithQueryMode(mode string) *QuerierBuilder { return q } +func (q *QuerierBuilder) WithEnableXFunctions() *QuerierBuilder { + q.enableXFunctions = true + return q +} + func (q *QuerierBuilder) WithEnvVars(envVars map[string]string) *QuerierBuilder { q.envVars = envVars return q @@ -484,6 +491,13 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { for _, bucket := range q.telemetrySeriesQuantiles { args = append(args, "--query.telemetry.request-series-seconds-quantiles="+strconv.FormatFloat(bucket, 'f', -1, 64)) } + if q.enableXFunctions { + args = append(args, "--query.enable-x-functions") + } + if q.engine != "" { + args = append(args, "--query.promql-engine="+string(q.engine)) + } + return args, nil } diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 5b9a120b90..81eda46f99 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -315,6 +315,49 @@ func TestQuery(t *testing.T) { }) } +func TestQueryWithExtendedFunctions(t *testing.T) { + t.Parallel() + + e, err := e2e.New(e2e.WithName("e2e-qry-xfunc")) + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + // create prom + sidecar + prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "prom", e2ethanos.DefaultPromConfig("prom", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver") + testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + + // create querier + q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).WithEngine("thanos").WithEnableXFunctions().Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + t.Cleanup(cancel) + + // send series to prom + samples := []seriesWithLabels{ + {intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "1", "b", "1", "instance", "1")}, + {intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "1", "b", "2", "instance", "1")}, + {intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "2", "b", "1", "instance", "1")}, + {intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "2", "b", "2", "instance", "1")}, + } + testutil.Ok(t, remoteWriteSeriesWithLabels(ctx, prom, samples)) + + // query + queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { + return `xrate(my_fake_metric{a="1", b="1"}[1m])` + }, time.Now, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "a": "1", + "b": "1", + "instance": "1", + "prometheus": "prom", + "replica": "0", + }, + }) +} + func TestQueryExternalPrefixWithoutReverseProxy(t *testing.T) { t.Parallel()