Skip to content

Commit

Permalink
Support experimental promql functions (#6355)
Browse files Browse the repository at this point in the history
* Upgrade promqlsmith to generate experimental promQL functions

Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>

* Add an experimental flag to enable experimental promQL functions

Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>

---------

Signed-off-by: SungJin1212 <tjdwls1201@gmail.com>
  • Loading branch information
SungJin1212 authored Nov 22, 2024
1 parent a5c4905 commit 0dc2e33
Show file tree
Hide file tree
Showing 18 changed files with 283 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ querier:
# evaluation like at Query Frontend or Ruler.
# CLI flag: -querier.ignore-max-query-length
[ignore_max_query_length: <boolean> | default = false]

# [Experimental] If true, experimental promQL functions are enabled.
# CLI flag: -querier.enable-promql-experimental-functions
[enable_promql_experimental_functions: <boolean> | default = false]
```
### `blocks_storage_config`
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3997,6 +3997,10 @@ store_gateway_client:
# like at Query Frontend or Ruler.
# CLI flag: -querier.ignore-max-query-length
[ignore_max_query_length: <boolean> | default = false]

# [Experimental] If true, experimental promQL functions are enabled.
# CLI flag: -querier.enable-promql-experimental-functions
[enable_promql_experimental_functions: <boolean> | default = false]
```
### `query_frontend_config`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/aws/aws-sdk-go v1.55.5
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874
github.com/cespare/xxhash v1.1.0
github.com/cortexproject/promqlsmith v0.0.0-20241102030034-4051538fd914
github.com/cortexproject/promqlsmith v0.0.0-20241121054008-8b48fe2471ef
github.com/dustin/go-humanize v1.0.1
github.com/efficientgo/core v1.0.0-rc.3
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -944,8 +944,8 @@ github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV
github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cortexproject/promqlsmith v0.0.0-20241102030034-4051538fd914 h1:UhI6yOSqMz3ln8FGaZRLbJTKzPHRaVwewoTa6N5PU5k=
github.com/cortexproject/promqlsmith v0.0.0-20241102030034-4051538fd914/go.mod h1:ypUb6BfnDVr7QrBgAxtzRqZ573swvka0BdCkPqa2A5g=
github.com/cortexproject/promqlsmith v0.0.0-20241121054008-8b48fe2471ef h1:wR21ZiKkA+wN2KG43qrK33IkFduY9JUa6th6P2KEU0o=
github.com/cortexproject/promqlsmith v0.0.0-20241121054008-8b48fe2471ef/go.mod h1:xbYQa0KX6Eh6YWbTBfZ9kK3N4hRxX+ZPIfVIY2U/y00=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
126 changes: 126 additions & 0 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,123 @@ func init() {
}
}

func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) {
prometheusLatestImage := "quay.io/prometheus/prometheus:v2.55.1"
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.tsdb.block-ranges-period": "2h",
"-blocks-storage.tsdb.ship-interval": "1h",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": "24h",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.query-store-for-labels-enabled": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-frontend.query-vertical-shard-size": "1",
"-frontend.max-cache-freshness": "1m",
// enable experimental promQL funcs
"-querier.enable-promql-experimental-functions": "true",
},
)
// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

now := time.Now()
start := now.Add(-time.Hour * 2)
end := now.Add(-time.Hour)
numSeries := 10
numSamples := 60
lbls := make([]labels.Labels, 0, numSeries*2)
scrapeInterval := time.Minute
statusCodes := []string{"200", "400", "404", "500", "502"}
for i := 0; i < numSeries; i++ {
lbls = append(lbls, labels.Labels{
{Name: labels.MetricName, Value: "test_series_a"},
{Name: "job", Value: "test"},
{Name: "series", Value: strconv.Itoa(i % 3)},
{Name: "status_code", Value: statusCodes[i%5]},
})

lbls = append(lbls, labels.Labels{
{Name: labels.MetricName, Value: "test_series_b"},
{Name: "job", Value: "test"},
{Name: "series", Value: strconv.Itoa((i + 1) % 3)},
{Name: "status_code", Value: statusCodes[(i+1)%5]},
})
}

ctx := context.Background()
rnd := rand.New(rand.NewSource(time.Now().Unix()))

dir := filepath.Join(s.SharedDir(), "data")
err = os.MkdirAll(dir, os.ModePerm)
require.NoError(t, err)
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
require.NoError(t, err)
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

// Wait for querier and store to sync blocks.
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"))))
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "querier"))))
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_bucket_store_blocks_loaded"}, e2e.WaitMissingMetrics))

c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
require.NoError(t, err)
prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{
"--enable-feature": "promql-experimental-functions",
})
require.NoError(t, s.StartAndWaitReady(prom))

c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
require.NoError(t, err)

waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)

opts := []promqlsmith.Option{
promqlsmith.WithEnableOffset(true),
promqlsmith.WithEnableAtModifier(true),
promqlsmith.WithEnabledFunctions(enabledFunctions),
promqlsmith.WithEnableExperimentalPromQLFunctions(true),
}
ps := promqlsmith.New(rnd, lbls, opts...)

runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000)
}

func TestDisableChunkTrimmingFuzz(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -1410,6 +1527,15 @@ func runQueryFuzzTestCases(t *testing.T, ps *promqlsmith.PromQLSmith, c1, c2 *e2
func isValidQuery(generatedQuery parser.Expr, maxDepth int) bool {
isValid := true
currentDepth := 0
// TODO(SungJin1212): Test limitk, limit_ratio
if strings.Contains(generatedQuery.String(), "limitk") {
// current skip the limitk
return false
}
if strings.Contains(generatedQuery.String(), "limit_ratio") {
// current skip the limit_ratio
return false
}
parser.Inspect(generatedQuery, func(node parser.Node, path []parser.Node) error {
if currentDepth > maxDepth {
isValid = false
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
t.Cfg.Querier.DefaultEvaluationInterval,
t.Cfg.Querier.MaxSubQuerySteps,
t.Cfg.Querier.LookbackDelta,
t.Cfg.Querier.EnablePromQLExperimentalFunctions,
)

return services.NewIdleService(nil, func(_ error) error {
Expand Down
8 changes: 7 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
Expand Down Expand Up @@ -89,7 +90,8 @@ type Config struct {
ThanosEngine bool `yaml:"thanos_engine"`

// Ignore max query length check at Querier.
IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"`
IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"`
EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"`
}

var (
Expand Down Expand Up @@ -132,6 +134,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.")
f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.")
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
}

// Validate the config
Expand Down Expand Up @@ -204,6 +207,9 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
})
maxConcurrentMetric.Set(float64(cfg.MaxConcurrent))

// set EnableExperimentalFunctions
parser.EnableExperimentalFunctions = cfg.EnablePromQLExperimentalFunctions

var queryEngine promql.QueryEngine
opts := promql.EngineOpts{
Logger: logger,
Expand Down
6 changes: 0 additions & 6 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,6 @@ func TestLimits(t *testing.T) {
}

func TestQuerier(t *testing.T) {
t.Parallel()
var cfg Config
flagext.DefaultValues(&cfg)
const chunks = 24
Expand Down Expand Up @@ -610,7 +609,6 @@ func TestQuerierMetric(t *testing.T) {
}

func TestNoHistoricalQueryToIngester(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
mint, maxt time.Time
Expand Down Expand Up @@ -711,7 +709,6 @@ func TestNoHistoricalQueryToIngester(t *testing.T) {
}

func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) {
t.Parallel()
const engineLookbackDelta = 5 * time.Minute

now := time.Now()
Expand Down Expand Up @@ -929,7 +926,6 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Series(t *testing.T) {
}

func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Labels(t *testing.T) {
t.Parallel()
const maxQueryLength = 30 * 24 * time.Hour
tests := map[string]struct {
startTime time.Time
Expand Down Expand Up @@ -1002,7 +998,6 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength_Labels(t *testing.T) {
}

func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
t.Parallel()
const (
engineLookbackDelta = 5 * time.Minute
thirtyDays = 30 * 24 * time.Hour
Expand Down Expand Up @@ -1511,7 +1506,6 @@ func (q *mockStoreQuerier) Close() error {
}

func TestShortTermQueryToLTS(t *testing.T) {
t.Parallel()
testCases := []struct {
name string
mint, maxt time.Time
Expand Down
10 changes: 8 additions & 2 deletions pkg/querier/tripperware/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,9 @@ func sortPlanForQuery(q string) (sortPlan, error) {
if err != nil {
return 0, err
}
// Check if the root expression is topk or bottomk
// Check if the root expression is topk, bottomk, limitk, or limit_ratio
if aggr, ok := expr.(*promqlparser.AggregateExpr); ok {
if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK {
if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK || aggr.Op == promqlparser.LIMITK || aggr.Op == promqlparser.LIMIT_RATIO {
return mergeOnly, nil
}
}
Expand All @@ -303,6 +303,12 @@ func sortPlanForQuery(q string) (sortPlan, error) {
if n.Func.Name == "sort_desc" {
sortDesc = true
}
if n.Func.Name == "sort_by_label" {
sortAsc = true
}
if n.Func.Name == "sort_by_label_desc" {
sortDesc = true
}
}
}
return sortAsc, sortDesc
Expand Down
22 changes: 22 additions & 0 deletions pkg/querier/tripperware/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/prometheus/prometheus/model/labels"
promqlparser "github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"

"github.com/cortexproject/cortex/pkg/cortexpb"
Expand Down Expand Up @@ -618,6 +619,16 @@ func Test_sortPlanForQuery(t *testing.T) {
expectedPlan: mergeOnly,
err: false,
},
{
query: "limitk(10, up)",
expectedPlan: mergeOnly,
err: false,
},
{
query: "limit_ratio(0.1, up)",
expectedPlan: mergeOnly,
err: false,
},
{
query: "1 + topk(10, up)",
expectedPlan: sortByLabels,
Expand All @@ -633,6 +644,16 @@ func Test_sortPlanForQuery(t *testing.T) {
expectedPlan: sortByValuesAsc,
err: false,
},
{
query: "1 + sort_by_label_desc(sum by (job) (up) )",
expectedPlan: sortByValuesDesc,
err: false,
},
{
query: "sort_by_label(topk by (job) (10, up))",
expectedPlan: sortByValuesAsc,
err: false,
},
{
query: "topk(5, up) by (job) + sort_desc(up)",
expectedPlan: sortByValuesDesc,
Expand All @@ -652,6 +673,7 @@ func Test_sortPlanForQuery(t *testing.T) {

for _, tc := range tc {
t.Run(tc.query, func(t *testing.T) {
promqlparser.EnableExperimentalFunctions = true
p, err := sortPlanForQuery(tc.query)
if tc.err {
assert.Error(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestRoundTrip(t *testing.T) {
time.Minute,
0,
0,
false,
)

for i, tc := range []struct {
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/tripperware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/querysharding"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -104,7 +105,12 @@ func NewQueryTripperware(
defaultSubQueryInterval time.Duration,
maxSubQuerySteps int64,
lookbackDelta time.Duration,
enablePromQLExperimentalFunctions bool,
) Tripperware {

// set EnableExperimentalFunctions
parser.EnableExperimentalFunctions = enablePromQLExperimentalFunctions

// Per tenant query metrics.
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_queries_total",
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/tripperware/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func TestRoundTrip(t *testing.T) {
time.Minute,
tc.maxSubQuerySteps,
0,
false,
)
resp, err := tw(downstream).RoundTrip(req)
if tc.expectedErr == nil {
Expand Down
Loading

0 comments on commit 0dc2e33

Please sign in to comment.