-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: improve performance of first_over_time
and last_over_time
queries by sharding them
#11605
Changes from 8 commits
de536b6
235e386
689c161
09f6d66
f7463ae
750cec7
9281f44
8f589fe
810b07c
5e106bf
7b3cadc
b10c1de
928ccfa
f89ea65
5ddf9df
104892b
67dd511
8e22df0
6404308
0d420dc
f8c9c2b
2bfd0b5
af67edd
361babc
2fb80d7
1b6cc02
1f04c45
d2b055b
87bbd75
307bfc1
4dc234f
995b982
5231b6f
9e10b2c
2325e85
69f504b
27a5d66
1b09ad3
bd7f275
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,6 +63,9 @@ func TestMappingEquivalence(t *testing.T) { | |
`, | ||
false, | ||
}, | ||
// Step 1: | ||
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false}, | ||
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false}, | ||
// topk prefers already-seen values in tiebreakers. Since the test data generates | ||
// the same log lines for each series & the resulting promql.Vectors aren't deterministically | ||
// sorted by labels, we don't expect this to pass. | ||
|
@@ -132,7 +135,7 @@ func TestMappingEquivalenceSketches(t *testing.T) { | |
query string | ||
realtiveError float64 | ||
}{ | ||
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.03}, | ||
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.05}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. did this need to change from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I change the test series to have the values spread out this would fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. which change is that? the addition of nanos in the timestamp within There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. |
||
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [1s]) by (a)`, 0.02}, | ||
} { | ||
q := NewMockQuerier( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -351,7 +351,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ | |
} | ||
defer util.LogErrorWithContext(ctx, "closing SampleExpr", stepEvaluator.Close) | ||
|
||
next, ts, r := stepEvaluator.Next() | ||
next, _, r := stepEvaluator.Next() | ||
if stepEvaluator.Error() != nil { | ||
return nil, stepEvaluator.Error() | ||
} | ||
|
@@ -361,7 +361,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ | |
case SampleVector: | ||
maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) } | ||
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture) | ||
return q.JoinSampleVector(next, ts, vec, stepEvaluator, maxSeries) | ||
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries) | ||
case ProbabilisticQuantileVector: | ||
return JoinQuantileSketchVector(next, vec, stepEvaluator, q.params) | ||
default: | ||
|
@@ -371,7 +371,7 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_ | |
return nil, nil | ||
} | ||
|
||
func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) { | ||
func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) { | ||
|
||
seriesIndex := map[uint64]*promql.Series{} | ||
|
||
|
@@ -419,15 +419,16 @@ func (q *query) JoinSampleVector(next bool, ts int64, r StepResult, stepEvaluato | |
seriesIndex[hash] = series | ||
} | ||
series.Floats = append(series.Floats, promql.FPoint{ | ||
T: ts, | ||
//T: ts, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure we can do it like this. What do you think, @cstyan? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what you mean? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current code on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. isn't the timestamp from the actual vector samples more accurate than the one got from Can you help me understand why it won't make sense? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does make sense for us. I'm just not sure some other component is relying on this overridden timestamp. |
||
T: p.T, | ||
F: p.F, | ||
}) | ||
} | ||
// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series. | ||
if len(seriesIndex) > maxSeries { | ||
return nil, logqlmodel.NewSeriesLimitError(maxSeries) | ||
} | ||
next, ts, r = stepEvaluator.Next() | ||
next, _, r = stepEvaluator.Next() | ||
if stepEvaluator.Error() != nil { | ||
return nil, stepEvaluator.Error() | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
package logql | ||
|
||
import ( | ||
"math" | ||
"time" | ||
|
||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/prometheus/prometheus/promql" | ||
|
||
"github.com/grafana/loki/pkg/iter" | ||
) | ||
|
||
func newFirstWithTimestampIterator( | ||
it iter.PeekingSampleIterator, | ||
selRange, step, start, end, offset int64) RangeVectorIterator { | ||
inner := &batchRangeVectorIterator{ | ||
iter: it, | ||
step: step, | ||
end: end, | ||
selRange: selRange, | ||
metrics: map[string]labels.Labels{}, | ||
window: map[string]*promql.Series{}, | ||
agg: nil, | ||
current: start - step, // first loop iteration will set it to start | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this does feel kind of brittle; is there a nice way we can have the iterator correctly retrieve the actual first step There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is how it's in the other cases. I think this is super brittle but couldn't think of a nicer way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Okay, I'll create an issue for that as a follow up, if this is following the existing pattern then lets just move forward with this as we have it now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rework the iterators. I think range over func golang/go#61405 might help here a lot. |
||
offset: offset, | ||
} | ||
return &firstWithTimestampBatchRangeVectorIterator{ | ||
batchRangeVectorIterator: inner, | ||
} | ||
} | ||
|
||
type firstWithTimestampBatchRangeVectorIterator struct { | ||
*batchRangeVectorIterator | ||
at []promql.Sample | ||
} | ||
|
||
// Step 7 | ||
func (r *firstWithTimestampBatchRangeVectorIterator) At() (int64, StepResult) { | ||
if r.at == nil { | ||
r.at = make([]promql.Sample, 0, len(r.window)) | ||
} | ||
r.at = r.at[:0] | ||
// convert ts from nano to milli seconds as the iterator work with nanoseconds | ||
ts := r.current/1e+6 + r.offset/1e+6 | ||
for _, series := range r.window { | ||
s := r.agg(series.Floats) | ||
r.at = append(r.at, promql.Sample{ | ||
F: s.F, | ||
T: s.T / int64(time.Millisecond), | ||
jeschkies marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Metric: series.Metric, | ||
}) | ||
} | ||
return ts, SampleVector(r.at) | ||
} | ||
|
||
func (r *firstWithTimestampBatchRangeVectorIterator) agg(samples []promql.FPoint) promql.FPoint { | ||
if len(samples) == 0 { | ||
return promql.FPoint{F: math.NaN(), T: 0} | ||
} | ||
return samples[0] | ||
} | ||
jeschkies marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Step 8 | ||
type firstOverTimeStepEvaluator struct { | ||
start, end, ts time.Time | ||
step time.Duration | ||
matrices []promql.Matrix | ||
streamVec map[int64]int | ||
} | ||
|
||
func NewMergeFirstOverTimeStepEvaluator(params Params, m []promql.Matrix) StepEvaluator { | ||
if len(m) == 0 { | ||
return EmptyEvaluator{} | ||
} | ||
|
||
var ( | ||
start = params.Start() | ||
end = params.End() | ||
step = params.Step() | ||
) | ||
|
||
index := make(map[int64]int, 0) | ||
for i, series := range m[1] { | ||
index[int64(series.Metric.Hash())] = i | ||
jeschkies marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
return &firstOverTimeStepEvaluator{ | ||
start: start, | ||
end: end, | ||
ts: start.Add(-step), // will be corrected on first Next() call | ||
step: step, | ||
matrices: m, | ||
streamVec: index, | ||
} | ||
} | ||
|
||
func (e *firstOverTimeStepEvaluator) Next() (bool, int64, StepResult) { | ||
|
||
var ( | ||
vec promql.Vector | ||
ok bool | ||
) | ||
|
||
// TODO: build index metric to vec pos | ||
|
||
e.ts = e.ts.Add(e.step) | ||
if e.ts.After(e.end) { | ||
return false, 0, nil | ||
} | ||
ts := e.ts.UnixNano() / int64(time.Millisecond) | ||
|
||
// Process first result | ||
// len(e.matrices) >= 1 was check during creation | ||
for s, series := range e.matrices[0] { | ||
if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { | ||
continue | ||
} | ||
|
||
vec = append(vec, promql.Sample{ | ||
Metric: series.Metric, | ||
T: series.Floats[0].T, | ||
F: series.Floats[0].F, | ||
}) | ||
|
||
e.pop(0, s) | ||
} | ||
|
||
if len(e.matrices) == 1 { | ||
return ok, ts, SampleVector(vec) | ||
} | ||
|
||
if len(vec) == 0 { | ||
return e.hasNext(), ts, SampleVector(vec) | ||
} | ||
|
||
// Merge other results | ||
for i, m := range e.matrices[1:] { | ||
// TODO: verify length and same labels/metric | ||
for j, series := range m { | ||
|
||
if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) { | ||
continue | ||
} | ||
|
||
// Merge | ||
if vec[j].T > series.Floats[0].T { | ||
vec[j].F = series.Floats[0].F | ||
vec[j].T = series.Floats[0].T | ||
} | ||
jeschkies marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// We've omitted the first matrix. That's why +1. | ||
e.pop(i+1, j) | ||
} | ||
} | ||
|
||
// Align vector timestamps with step | ||
for i := range vec { | ||
vec[i].T = ts | ||
} | ||
|
||
return true, ts, SampleVector(vec) | ||
} | ||
jeschkies marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
func (e *firstOverTimeStepEvaluator) pop(r, s int) { | ||
if len(e.matrices[r][s].Floats) <= 1 { | ||
e.matrices[r][s].Floats = nil | ||
return | ||
} | ||
e.matrices[r][s].Floats = e.matrices[r][s].Floats[1:] | ||
} | ||
|
||
func (e *firstOverTimeStepEvaluator) inRange(t, ts int64) bool { | ||
previous := ts - e.step.Milliseconds() | ||
return previous <= t && t < ts | ||
} | ||
|
||
func (e *firstOverTimeStepEvaluator) hasNext() bool { | ||
for _, m := range e.matrices { | ||
for _, s := range m { | ||
if len(s.Floats) != 0 { | ||
return true | ||
} | ||
} | ||
} | ||
|
||
return false | ||
} | ||
|
||
func (*firstOverTimeStepEvaluator) Close() error { return nil } | ||
|
||
func (*firstOverTimeStepEvaluator) Error() error { return nil } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible that a shard which only retrieves data for a single relevant stream could return a vector? or would the vector for that single series always be wrapped in a matrix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be a matrix with one vector I believe. Only for instant queries we return a vector https://github.com/grafana/loki/blob/main/pkg/logql/engine.go#L388