diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index 55a481bdfad36..afc1cbfe02bc3 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -19,7 +19,8 @@ import ( // Expr is the root expression which can be a SampleExpr or LogSelectorExpr type Expr interface { - logQLExpr() // ensure it's not implemented accidentally + logQLExpr() // ensure it's not implemented accidentally + Shardable() bool // A recursive check on the AST to see if it's shardable. fmt.Stringer } @@ -134,7 +135,7 @@ func (m MultiStageExpr) String() string { return sb.String() } -func (MultiStageExpr) logQLExpr() {} +func (MultiStageExpr) logQLExpr() {} // nolint:unused type matchersExpr struct { matchers []*labels.Matcher @@ -149,6 +150,8 @@ func (e *matchersExpr) Matchers() []*labels.Matcher { return e.matchers } +func (e *matchersExpr) Shardable() bool { return true } + func (e *matchersExpr) String() string { var sb strings.Builder sb.WriteString("{") @@ -183,6 +186,15 @@ func newPipelineExpr(left *matchersExpr, pipeline MultiStageExpr) LogSelectorExp } } +func (e *pipelineExpr) Shardable() bool { + for _, p := range e.pipeline { + if !p.Shardable() { + return false + } + } + return true +} + func (e *pipelineExpr) Matchers() []*labels.Matcher { return e.left.Matchers() } @@ -242,6 +254,8 @@ func AddFilterExpr(expr LogSelectorExpr, ty labels.MatchType, match string) (Log } +func (e *lineFilterExpr) Shardable() bool { return true } + func (e *lineFilterExpr) String() string { var sb strings.Builder if e.left != nil { @@ -302,6 +316,8 @@ func newLabelParserExpr(op, param string) *labelParserExpr { } } +func (e *labelParserExpr) Shardable() bool { return true } + func (e *labelParserExpr) Stage() (log.Stage, error) { switch e.op { case OpParserTypeJSON: @@ -332,6 +348,8 @@ type labelFilterExpr struct { implicit } +func (e *labelFilterExpr) Shardable() bool { return true } + func (e *labelFilterExpr) Stage() (log.Stage, error) { return e.LabelFilterer, nil } @@ -351,6 +369,8 @@ func newLineFmtExpr(value string) *lineFmtExpr { } } +func (e *lineFmtExpr) Shardable() bool { return true } + func (e *lineFmtExpr) Stage() (log.Stage, error) { return log.NewFormatter(e.value) } @@ -371,6 +391,8 @@ func newLabelFmtExpr(fmts []log.LabelFmt) *labelFmtExpr { } } +func (e *labelFmtExpr) Shardable() bool { return false } + func (e *labelFmtExpr) Stage() (log.Stage, error) { return log.NewLabelsFormatter(e.formats) } @@ -456,6 +478,8 @@ func (r logRange) String() string { return sb.String() } +func (r *logRange) Shardable() bool { return r.left.Shardable() } + func newLogRange(left LogSelectorExpr, interval time.Duration, u *unwrapExpr) *logRange { return &logRange{ left: left, @@ -554,8 +578,6 @@ type SampleExpr interface { // Selector is the LogQL selector to apply when retrieving logs. Selector() LogSelectorExpr Extractor() (SampleExtractor, error) - // Operations returns the list of operations used in this SampleExpr - Operations() []string Expr } @@ -644,8 +666,8 @@ func (e *rangeAggregationExpr) String() string { } // impl SampleExpr -func (e *rangeAggregationExpr) Operations() []string { - return []string{e.operation} +func (e *rangeAggregationExpr) Shardable() bool { + return shardableOps[e.operation] && e.left.Shardable() } type grouping struct { @@ -748,8 +770,8 @@ func (e *vectorAggregationExpr) String() string { } // impl SampleExpr -func (e *vectorAggregationExpr) Operations() []string { - return append(e.left.Operations(), e.operation) +func (e *vectorAggregationExpr) Shardable() bool { + return shardableOps[e.operation] && e.left.Shardable() } type BinOpOptions struct { @@ -771,9 +793,8 @@ func (e *binOpExpr) String() string { } // impl SampleExpr -func (e *binOpExpr) Operations() []string { - ops := append(e.SampleExpr.Operations(), e.RHS.Operations()...) - return append(ops, e.op) +func (e *binOpExpr) Shardable() bool { + return shardableOps[e.op] && e.SampleExpr.Shardable() && e.RHS.Shardable() } func mustNewBinOpExpr(op string, opts BinOpOptions, lhs, rhs Expr) SampleExpr { @@ -872,7 +893,7 @@ func (e *literalExpr) String() string { // and they will only be present in binary operation legs. func (e *literalExpr) Selector() LogSelectorExpr { return e } func (e *literalExpr) HasFilter() bool { return false } -func (e *literalExpr) Operations() []string { return nil } +func (e *literalExpr) Shardable() bool { return true } func (e *literalExpr) Pipeline() (log.Pipeline, error) { return log.NewNoopPipeline(), nil } func (e *literalExpr) Matchers() []*labels.Matcher { return nil } func (e *literalExpr) Extractor() (log.SampleExtractor, error) { return nil, nil } @@ -932,8 +953,8 @@ func (e *labelReplaceExpr) Extractor() (SampleExtractor, error) { return e.left.Extractor() } -func (e *labelReplaceExpr) Operations() []string { - return append([]string{OpLabelReplace}, e.left.Operations()...) +func (e *labelReplaceExpr) Shardable() bool { + return false } func (e *labelReplaceExpr) String() string { diff --git a/pkg/logql/shardmapper.go b/pkg/logql/shardmapper.go index 1b79dfaad1f1c..628fa18956665 100644 --- a/pkg/logql/shardmapper.go +++ b/pkg/logql/shardmapper.go @@ -206,7 +206,7 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr, r *sh // if this AST contains unshardable operations, don't shard this at this level, // but attempt to shard a child node. - if shardable := isShardable(expr.Operations()); !shardable { + if !expr.Shardable() { subMapped, err := m.Map(expr.left, r) if err != nil { return nil, err @@ -324,16 +324,6 @@ func hasLabelModifier(expr *rangeAggregationExpr) bool { return false } -// isShardable returns false if any of the listed operation types are not shardable and true otherwise -func isShardable(ops []string) bool { - for _, op := range ops { - if shardable := shardableOps[op]; !shardable { - return false - } - } - return true -} - // shardableOps lists the operations which may be sharded. // topk, botk, max, & min all must be concatenated and then evaluated in order to avoid // potential data loss due to series distribution across shards. diff --git a/pkg/logql/shardmapper_test.go b/pkg/logql/shardmapper_test.go index cdd5e4806b8f9..9425ff792072a 100644 --- a/pkg/logql/shardmapper_test.go +++ b/pkg/logql/shardmapper_test.go @@ -188,6 +188,11 @@ func TestMappingStrings(t *testing.T) { `, out: `sum without(a)(label_replace(sum without(b)(downstream++downstream),"baz","buz","foo","(.*)"))`, }, + { + // Ensure we don't try to shard expressions that include label reformatting. + in: `sum(count_over_time({foo="bar"} | logfmt | label_format bar=baz | bar="buz" [5m]))`, + out: `sum(count_over_time({foo="bar"} | logfmt | label_format bar=baz | bar="buz" [5m]))`, + }, } { t.Run(tc.in, func(t *testing.T) { ast, err := ParseExpr(tc.in)