Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

label_format no longer shardable and introduces the Shardable() metho… #3137

Merged
merged 2 commits into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import (

// Expr is the root expression which can be a SampleExpr or LogSelectorExpr
type Expr interface {
logQLExpr() // ensure it's not implemented accidentally
logQLExpr() // ensure it's not implemented accidentally
Shardable() bool // A recursive check on the AST to see if it's shardable.
fmt.Stringer
}

Expand Down Expand Up @@ -134,7 +135,7 @@ func (m MultiStageExpr) String() string {
return sb.String()
}

func (MultiStageExpr) logQLExpr() {}
func (MultiStageExpr) logQLExpr() {} // nolint:unused

type matchersExpr struct {
matchers []*labels.Matcher
Expand All @@ -149,6 +150,8 @@ func (e *matchersExpr) Matchers() []*labels.Matcher {
return e.matchers
}

func (e *matchersExpr) Shardable() bool { return true }

func (e *matchersExpr) String() string {
var sb strings.Builder
sb.WriteString("{")
Expand Down Expand Up @@ -183,6 +186,15 @@ func newPipelineExpr(left *matchersExpr, pipeline MultiStageExpr) LogSelectorExp
}
}

func (e *pipelineExpr) Shardable() bool {
for _, p := range e.pipeline {
if !p.Shardable() {
return false
}
}
return true
}

func (e *pipelineExpr) Matchers() []*labels.Matcher {
return e.left.Matchers()
}
Expand Down Expand Up @@ -242,6 +254,8 @@ func AddFilterExpr(expr LogSelectorExpr, ty labels.MatchType, match string) (Log

}

func (e *lineFilterExpr) Shardable() bool { return true }

func (e *lineFilterExpr) String() string {
var sb strings.Builder
if e.left != nil {
Expand Down Expand Up @@ -302,6 +316,8 @@ func newLabelParserExpr(op, param string) *labelParserExpr {
}
}

func (e *labelParserExpr) Shardable() bool { return true }

func (e *labelParserExpr) Stage() (log.Stage, error) {
switch e.op {
case OpParserTypeJSON:
Expand Down Expand Up @@ -332,6 +348,8 @@ type labelFilterExpr struct {
implicit
}

func (e *labelFilterExpr) Shardable() bool { return true }

func (e *labelFilterExpr) Stage() (log.Stage, error) {
return e.LabelFilterer, nil
}
Expand All @@ -351,6 +369,8 @@ func newLineFmtExpr(value string) *lineFmtExpr {
}
}

func (e *lineFmtExpr) Shardable() bool { return true }

func (e *lineFmtExpr) Stage() (log.Stage, error) {
return log.NewFormatter(e.value)
}
Expand All @@ -371,6 +391,8 @@ func newLabelFmtExpr(fmts []log.LabelFmt) *labelFmtExpr {
}
}

func (e *labelFmtExpr) Shardable() bool { return false }

func (e *labelFmtExpr) Stage() (log.Stage, error) {
return log.NewLabelsFormatter(e.formats)
}
Expand Down Expand Up @@ -456,6 +478,8 @@ func (r logRange) String() string {
return sb.String()
}

func (r *logRange) Shardable() bool { return r.left.Shardable() }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this was a Expr.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, but it's called via rangeAggExpr.Shardable() -> logRange.Shardable() -> LogSelectorExpr.Shardable() so it seemed more obvious to use the same method :).


func newLogRange(left LogSelectorExpr, interval time.Duration, u *unwrapExpr) *logRange {
return &logRange{
left: left,
Expand Down Expand Up @@ -554,8 +578,6 @@ type SampleExpr interface {
// Selector is the LogQL selector to apply when retrieving logs.
Selector() LogSelectorExpr
Extractor() (SampleExtractor, error)
// Operations returns the list of operations used in this SampleExpr
Operations() []string
Expr
}

Expand Down Expand Up @@ -644,8 +666,8 @@ func (e *rangeAggregationExpr) String() string {
}

// impl SampleExpr
func (e *rangeAggregationExpr) Operations() []string {
return []string{e.operation}
func (e *rangeAggregationExpr) Shardable() bool {
return shardableOps[e.operation] && e.left.Shardable()
}

type grouping struct {
Expand Down Expand Up @@ -748,8 +770,8 @@ func (e *vectorAggregationExpr) String() string {
}

// impl SampleExpr
func (e *vectorAggregationExpr) Operations() []string {
return append(e.left.Operations(), e.operation)
func (e *vectorAggregationExpr) Shardable() bool {
return shardableOps[e.operation] && e.left.Shardable()
}

type BinOpOptions struct {
Expand All @@ -771,9 +793,8 @@ func (e *binOpExpr) String() string {
}

// impl SampleExpr
func (e *binOpExpr) Operations() []string {
ops := append(e.SampleExpr.Operations(), e.RHS.Operations()...)
return append(ops, e.op)
func (e *binOpExpr) Shardable() bool {
return shardableOps[e.op] && e.SampleExpr.Shardable() && e.RHS.Shardable()
}

func mustNewBinOpExpr(op string, opts BinOpOptions, lhs, rhs Expr) SampleExpr {
Expand Down Expand Up @@ -872,7 +893,7 @@ func (e *literalExpr) String() string {
// and they will only be present in binary operation legs.
func (e *literalExpr) Selector() LogSelectorExpr { return e }
func (e *literalExpr) HasFilter() bool { return false }
func (e *literalExpr) Operations() []string { return nil }
func (e *literalExpr) Shardable() bool { return true }
func (e *literalExpr) Pipeline() (log.Pipeline, error) { return log.NewNoopPipeline(), nil }
func (e *literalExpr) Matchers() []*labels.Matcher { return nil }
func (e *literalExpr) Extractor() (log.SampleExtractor, error) { return nil, nil }
Expand Down Expand Up @@ -932,8 +953,8 @@ func (e *labelReplaceExpr) Extractor() (SampleExtractor, error) {
return e.left.Extractor()
}

func (e *labelReplaceExpr) Operations() []string {
return append([]string{OpLabelReplace}, e.left.Operations()...)
func (e *labelReplaceExpr) Shardable() bool {
return false
}

func (e *labelReplaceExpr) String() string {
Expand Down
12 changes: 1 addition & 11 deletions pkg/logql/shardmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (m ShardMapper) mapVectorAggregationExpr(expr *vectorAggregationExpr, r *sh

// if this AST contains unshardable operations, don't shard this at this level,
// but attempt to shard a child node.
if shardable := isShardable(expr.Operations()); !shardable {
if !expr.Shardable() {
subMapped, err := m.Map(expr.left, r)
if err != nil {
return nil, err
Expand Down Expand Up @@ -324,16 +324,6 @@ func hasLabelModifier(expr *rangeAggregationExpr) bool {
return false
}

// isShardable returns false if any of the listed operation types are not shardable and true otherwise
func isShardable(ops []string) bool {
for _, op := range ops {
if shardable := shardableOps[op]; !shardable {
return false
}
}
return true
}

// shardableOps lists the operations which may be sharded.
// topk, botk, max, & min all must be concatenated and then evaluated in order to avoid
// potential data loss due to series distribution across shards.
Expand Down
5 changes: 5 additions & 0 deletions pkg/logql/shardmapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ func TestMappingStrings(t *testing.T) {
`,
out: `sum without(a)(label_replace(sum without(b)(downstream<sum without(b)(rate({foo="bar"}[5m])),shard=0_of_2>++downstream<sum without(b)(rate({foo="bar"}[5m])),shard=1_of_2>),"baz","buz","foo","(.*)"))`,
},
{
// Ensure we don't try to shard expressions that include label reformatting.
in: `sum(count_over_time({foo="bar"} | logfmt | label_format bar=baz | bar="buz" [5m]))`,
out: `sum(count_over_time({foo="bar"} | logfmt | label_format bar=baz | bar="buz" [5m]))`,
},
} {
t.Run(tc.in, func(t *testing.T) {
ast, err := ParseExpr(tc.in)
Expand Down