Skip to content

Commit

Permalink
Merge commit 'f8c9c2bdbf2013cc151e986330263bf08dbeeefc' into karsten/…
Browse files Browse the repository at this point in the history
…first-over-time
  • Loading branch information
jeschkies committed Jan 23, 2024
2 parents 0d420dc + f8c9c2b commit 2bfd0b5
Show file tree
Hide file tree
Showing 9 changed files with 453 additions and 192 deletions.
63 changes: 63 additions & 0 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,34 @@ func (e *MergeFirstOverTimeExpr) Walk(f syntax.WalkFn) {
}
}

type MergeLastOverTimeExpr struct {
syntax.SampleExpr
downstreams []DownstreamSampleExpr
}

func (e MergeLastOverTimeExpr) String() string {
var sb strings.Builder
for i, d := range e.downstreams {
if i >= defaultMaxDepth {
break
}

if i > 0 {
sb.WriteString(" ++ ")
}

sb.WriteString(d.String())
}
return fmt.Sprintf("MergeLastOverTime<%s>", sb.String())
}

func (e *MergeLastOverTimeExpr) Walk(f syntax.WalkFn) {
f(e)
for _, d := range e.downstreams {
d.Walk(f)
}
}

type Shards []astmapper.ShardAnnotation

func (xs Shards) Encode() (encoded []string) {
Expand Down Expand Up @@ -458,7 +486,42 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
}

return NewMergeFirstOverTimeStepEvaluator(params, xs), nil
case *MergeLastOverTimeExpr:
queries := make([]DownstreamQuery, len(e.downstreams))

for i, d := range e.downstreams {
qry := DownstreamQuery{
Params: ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: d.SampleExpr,
},
}
if shard := d.shard; shard != nil {
qry.Params = ParamsWithShardsOverride{
Params: qry.Params,
ShardsOverride: Shards{*shard}.Encode(),
}
}
queries[i] = qry
}

results, err := ev.Downstream(ctx, queries)
if err != nil {
return nil, err
}

xs := make([]promql.Matrix, 0, len(queries))
for _, res := range results {

switch data := res.Data.(type) {
case promql.Matrix:
xs = append(xs, data)
default:
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}

return NewMergeLastOverTimeStepEvaluator(params, xs), nil
default:
return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func TestMappingEquivalence(t *testing.T) {
},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false},
// topk prefers already-seen values in tiebreakers. Since the test data generates
// the same log lines for each series & the resulting promql.Vectors aren't deterministically
// sorted by labels, we don't expect this to pass.
Expand Down
11 changes: 11 additions & 0 deletions pkg/logql/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,17 @@ func newRangeAggEvaluator(
q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(),
)

return &RangeVectorEvaluator{
iter: iter,
}, nil
case syntax.OpRangeTypeLastWithTimestamp:
iter := newLastWithTimestampIterator(
it,
expr.Left.Interval.Nanoseconds(),
q.Step().Nanoseconds(),
q.Start().UnixNano(), q.End().UnixNano(), o.Nanoseconds(),
)

return &RangeVectorEvaluator{
iter: iter,
}, nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/logql/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func (e *firstOverTimeStepEvaluator) Explain(parent Node) {
parent.Child("MergeFirstOverTime")
}

func (e *lastOverTimeStepEvaluator) Explain(parent Node) {
parent.Child("MergeLastOverTime")
}

func (EmptyEvaluator) Explain(parent Node) {
parent.Child("Empty")
}
Loading

0 comments on commit 2bfd0b5

Please sign in to comment.