Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueryFrontend|Query: Create new arg to enable extended functions #7028

Merged
merged 10 commits into from
Jan 3, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6984](https://github.com/thanos-io/thanos/pull/6984) Store Gateway: Added `--store.index-header-lazy-download-strategy` to specify how to lazily download index headers when lazy mmap is enabled.

- [#6887](https://github.com/thanos-io/thanos/pull/6887) Query Frontend: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing custom dashboard queries to be incorrect due to the added label.
- [#7028](https://github.com/thanos-io/thanos/pull/7028) Query|Query Frontend: Add new `--query-frontend.enable-x-functions` flag to enable experimental extended functions.

### Changed

Expand Down
5 changes: 4 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func registerQuery(app *extkingpin.App) {

defaultEngine := cmd.Flag("query.promql-engine", "Default PromQL engine to use.").Default(string(apiv1.PromqlEnginePrometheus)).
Enum(string(apiv1.PromqlEnginePrometheus), string(apiv1.PromqlEngineThanos))

extendedFunctionsEnabled := cmd.Flag("query.enable-x-functions", "Whether to enable extended rate functions (xrate, xincrease and xdelta). Only has effect when used with Thanos engine.").Default("false").Bool()
promqlQueryMode := cmd.Flag("query.mode", "PromQL query mode. One of: local, distributed.").
Hidden().
Default(string(queryModeLocal)).
Expand Down Expand Up @@ -342,6 +342,7 @@ func registerQuery(app *extkingpin.App) {
*queryTelemetrySeriesQuantiles,
*defaultEngine,
storeRateLimits,
*extendedFunctionsEnabled,
queryMode(*promqlQueryMode),
*tenantHeader,
*defaultTenant,
Expand Down Expand Up @@ -421,6 +422,7 @@ func runQuery(
queryTelemetrySeriesQuantiles []float64,
defaultEngine string,
storeRateLimits store.SeriesSelectLimits,
extendedFunctionsEnabled bool,
queryMode queryMode,
tenantHeader string,
defaultTenant string,
Expand Down Expand Up @@ -652,6 +654,7 @@ func runQuery(
engineFactory := apiv1.NewQueryEngineFactory(
engineOpts,
remoteEngineEndpoints,
extendedFunctionsEnabled,
)

lookbackDeltaCreator := LookbackDeltaFactory(engineOpts, dynamicLookbackDelta)
Expand Down
11 changes: 11 additions & 0 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/weaveworks/common/user"
"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -92,6 +94,9 @@ func registerQueryFrontend(app *extkingpin.App) {
cmd.Flag("query-range.max-retries-per-request", "Maximum number of retries for a single query range request; beyond this, the downstream error is returned.").
Default("5").IntVar(&cfg.QueryRangeConfig.MaxRetries)

cmd.Flag("query-frontend.enable-x-functions", "Enable experimental x- functions in query-frontend. --no-query-frontend.enable-x-functions for disabling.").
Default("false").BoolVar(&cfg.EnableXFunctions)

cmd.Flag("query-range.max-query-length", "Limit the query time range (end - start time) in the query-frontend, 0 disables it.").
Default("0").DurationVar((*time.Duration)(&cfg.QueryRangeConfig.Limits.MaxQueryLength))

Expand Down Expand Up @@ -285,6 +290,12 @@ func runQueryFrontend(
return errors.Wrap(err, "error validating the config")
}

if cfg.EnableXFunctions {
MichaHoffmann marked this conversation as resolved.
Show resolved Hide resolved
for fname, v := range parse.XFunctions {
parser.Functions[fname] = v
}
}

tripperWare, err := queryfrontend.NewTripperware(cfg.Config, reg, logger)
if err != nil {
return errors.Wrap(err, "setup tripperwares")
Expand Down
5 changes: 5 additions & 0 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@ Flags:
--query-frontend.downstream-url="http://localhost:9090"
URL of downstream Prometheus Query compatible
API.
--query-frontend.enable-x-functions
Enable experimental x-
functions in query-frontend.
--no-query-frontend.enable-x-functions for
disabling.
--query-frontend.forward-header=<http-header-name> ...
List of headers forwarded by the query-frontend
to downstream queriers, default is empty
Expand Down
4 changes: 4 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ Flags:
--query.default-tenant-id="default-tenant"
Default tenant ID to use if tenant header is
not present
--query.enable-x-functions
Whether to enable extended rate functions
(xrate, xincrease and xdelta). Only has effect
when used with Thanos engine.
--query.lookback-delta=QUERY.LOOKBACK-DELTA
The maximum lookback duration for retrieving
metrics during expression evaluations.
Expand Down
9 changes: 4 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type QueryEngineFactory struct {

createThanosEngine sync.Once
thanosEngine v1.QueryEngine
enableXFunctions bool
}

func (f *QueryEngineFactory) GetPrometheusEngine() v1.QueryEngine {
Expand All @@ -118,7 +119,7 @@ func (f *QueryEngineFactory) GetThanosEngine() v1.QueryEngine {
return
}
if f.remoteEngineEndpoints == nil {
f.thanosEngine = engine.New(engine.Opts{EngineOpts: f.engineOpts, Engine: f.GetPrometheusEngine(), EnableAnalysis: true})
f.thanosEngine = engine.New(engine.Opts{EngineOpts: f.engineOpts, Engine: f.GetPrometheusEngine(), EnableAnalysis: true, EnableXFunctions: f.enableXFunctions})
} else {
f.thanosEngine = engine.NewDistributedEngine(engine.Opts{EngineOpts: f.engineOpts, Engine: f.GetPrometheusEngine(), EnableAnalysis: true}, f.remoteEngineEndpoints)
}
Expand All @@ -127,13 +128,11 @@ func (f *QueryEngineFactory) GetThanosEngine() v1.QueryEngine {
return f.thanosEngine
}

func NewQueryEngineFactory(
engineOpts promql.EngineOpts,
remoteEngineEndpoints promqlapi.RemoteEndpoints,
) *QueryEngineFactory {
func NewQueryEngineFactory(engineOpts promql.EngineOpts, remoteEngineEndpoints promqlapi.RemoteEndpoints, enableExtendedFunctions bool) *QueryEngineFactory {
return &QueryEngineFactory{
engineOpts: engineOpts,
remoteEngineEndpoints: remoteEngineEndpoints,
enableXFunctions: enableExtendedFunctions,
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func TestQueryEndpoints(t *testing.T) {
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}, nil)
}, nil, false)
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
Expand Down Expand Up @@ -643,7 +643,7 @@ func TestQueryExplainEndpoints(t *testing.T) {
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}, nil)
}, nil, false)
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
Expand Down Expand Up @@ -707,7 +707,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) {
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}, nil)
}, nil, false)
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
Expand Down Expand Up @@ -881,7 +881,7 @@ func TestMetadataEndpoints(t *testing.T) {
Reg: nil,
MaxSamples: 10000,
Timeout: timeout,
}, nil)
}, nil, false)
api := &QueryAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
Expand Down
1 change: 1 addition & 0 deletions pkg/queryfrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ type Config struct {
TenantHeader string
DefaultTenant string
TenantCertField string
EnableXFunctions bool
}

// QueryRangeConfig holds the config for query range tripperware.
Expand Down
20 changes: 17 additions & 3 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/thanos-io/objstore/exthttp"

"github.com/thanos-io/thanos/pkg/alert"
apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/clientconfig"
"github.com/thanos-io/thanos/pkg/queryfrontend"
"github.com/thanos-io/thanos/pkg/receive"
Expand Down Expand Up @@ -253,8 +254,9 @@ type QuerierBuilder struct {
endpoints []string
strictEndpoints []string

engine string
queryMode string
engine apiv1.PromqlEngineType
queryMode string
enableXFunctions bool

replicaLabels []string
tracingConfig string
Expand Down Expand Up @@ -362,7 +364,7 @@ func (q *QuerierBuilder) WithDisablePartialResponses(disable bool) *QuerierBuild
return q
}

func (q *QuerierBuilder) WithEngine(engine string) *QuerierBuilder {
func (q *QuerierBuilder) WithEngine(engine apiv1.PromqlEngineType) *QuerierBuilder {
q.engine = engine
return q
}
Expand All @@ -372,6 +374,11 @@ func (q *QuerierBuilder) WithQueryMode(mode string) *QuerierBuilder {
return q
}

func (q *QuerierBuilder) WithEnableXFunctions() *QuerierBuilder {
q.enableXFunctions = true
return q
}

func (q *QuerierBuilder) WithEnvVars(envVars map[string]string) *QuerierBuilder {
q.envVars = envVars
return q
Expand Down Expand Up @@ -484,6 +491,13 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {
for _, bucket := range q.telemetrySeriesQuantiles {
args = append(args, "--query.telemetry.request-series-seconds-quantiles="+strconv.FormatFloat(bucket, 'f', -1, 64))
}
if q.enableXFunctions {
args = append(args, "--query.enable-x-functions")
}
if q.engine != "" {
args = append(args, "--query.promql-engine="+string(q.engine))
}

return args, nil
}

Expand Down
43 changes: 43 additions & 0 deletions test/e2e/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,49 @@ func TestQuery(t *testing.T) {
})
}

func TestQueryWithExtendedFunctions(t *testing.T) {
t.Parallel()

e, err := e2e.New(e2e.WithName("e2e-qry-xfunc"))
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

// create prom + sidecar
prom, sidecar := e2ethanos.NewPrometheusWithSidecar(e, "prom", e2ethanos.DefaultPromConfig("prom", 0, "", "", e2ethanos.LocalPrometheusTarget), "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver")
testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar))

// create querier
q := e2ethanos.NewQuerierBuilder(e, "1", sidecar.InternalEndpoint("grpc")).WithEngine("thanos").WithEnableXFunctions().Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
t.Cleanup(cancel)

// send series to prom
samples := []seriesWithLabels{
{intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "1", "b", "1", "instance", "1")},
{intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "1", "b", "2", "instance", "1")},
{intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "2", "b", "1", "instance", "1")},
{intLabels: labels.FromStrings("__name__", "my_fake_metric", "a", "2", "b", "2", "instance", "1")},
}
testutil.Ok(t, remoteWriteSeriesWithLabels(ctx, prom, samples))

// query
queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string {
return `xrate(my_fake_metric{a="1", b="1"}[1m])`
}, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
"a": "1",
"b": "1",
"instance": "1",
"prometheus": "prom",
"replica": "0",
},
})
}

func TestQueryExternalPrefixWithoutReverseProxy(t *testing.T) {
t.Parallel()

Expand Down
Loading