Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: special case the return values from a sharded first/last_over_time query #13578

Merged
merged 7 commits into from
Jul 25, 2024
1 change: 0 additions & 1 deletion pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,6 @@ func (ev *DownstreamEvaluator) NewStepEvaluator(
return nil, fmt.Errorf("unexpected type (%s) uncoercible to StepEvaluator", data.Type())
}
}

return NewMergeLastOverTimeStepEvaluator(params, xs), nil
default:
return ev.defaultEvaluator.NewStepEvaluator(ctx, nextEvFactory, e, params)
Expand Down
69 changes: 69 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,75 @@ func TestMappingEquivalenceSketches(t *testing.T) {
}
}

func TestMappingEquivalence_Instant(t *testing.T) {
var (
shards = 3
nStreams = 60
rounds = 20
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, true)
end = time.Unix(0, int64(time.Second*time.Duration(rounds)))
interval = time.Duration(0)
limit = 100
)

for _, tc := range []struct {
query string
approximate bool
shardAgg []string
}{
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardFirstOverTime}},
{`first_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardFirstOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s])`, false, []string{ShardLastOverTime}},
{`last_over_time({a=~".+"} | logfmt | unwrap value [1s]) by (a)`, false, []string{ShardLastOverTime}},
} {
q := NewMockQuerier(
shards,
streams,
)

opts := EngineOpts{}
regular := NewEngine(opts, q, NoLimits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger())

t.Run(tc.query, func(t *testing.T) {
params, err := NewLiteralParams(
tc.query,
end,
end,
0,
interval,
logproto.FORWARD,
uint32(limit),
nil,
nil,
)
require.NoError(t, err)

qry := regular.Query(params)
ctx := user.InjectOrgID(context.Background(), "fake")

strategy := NewPowerOfTwoStrategy(ConstantShards(shards))
mapper := NewShardMapper(strategy, nilShardMetrics, tc.shardAgg)
_, _, mapped, err := mapper.Parse(params.GetExpression())
require.NoError(t, err)

shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{Params: params, ExpressionOverride: mapped})

res, err := qry.Exec(ctx)
require.NoError(t, err)

shardedRes, err := shardedQry.Exec(ctx)
require.NoError(t, err)

if tc.approximate {
cstyan marked this conversation as resolved.
Show resolved Hide resolved
approximatelyEquals(t, res.Data.(promql.Matrix), shardedRes.Data.(promql.Matrix))
} else {
require.Equal(t, res.Data, shardedRes.Data)
}
})
}
}

func TestShardCounter(t *testing.T) {
var (
shards = 3
Expand Down
66 changes: 44 additions & 22 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,11 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
case SampleVector:
maxSeriesCapture := func(id string) int { return q.limits.MaxQuerySeries(ctx, id) }
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries)
mfl := false
if rae, ok := expr.(*syntax.RangeAggregationExpr); ok && (rae.Operation == syntax.OpRangeTypeFirstWithTimestamp || rae.Operation == syntax.OpRangeTypeLastWithTimestamp) {
mfl = true
}
return q.JoinSampleVector(next, vec, stepEvaluator, maxSeries, mfl)
case ProbabilisticQuantileVector:
return MergeQuantileSketchVector(next, vec, stepEvaluator, q.params)
default:
Expand All @@ -381,9 +385,36 @@ func (q *query) evalSample(ctx context.Context, expr syntax.SampleExpr) (promql_
return nil, errors.New("unexpected empty result")
}

func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int) (promql_parser.Value, error) {
func vectorsToSeries(vec promql.Vector) ([]promql.Series, map[uint64]*promql.Series) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a test function for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is tested via TestMappingEquivalence_Instant, we could add a test specifically for this but it didn't feel necessary

seriesIndex := map[uint64]*promql.Series{}
for _, p := range vec {
var (
series *promql.Series
hash = p.Metric.Hash()
ok bool
)

series, ok = seriesIndex[hash]
if !ok {
series = &promql.Series{
Metric: p.Metric,
Floats: make([]promql.FPoint, 0, 1),
}
seriesIndex[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: p.T,
F: p.F,
})
}
series := make([]promql.Series, 0, len(seriesIndex))
for _, s := range seriesIndex {
series = append(series, *s)
}
return series, seriesIndex
}

func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEvaluator, maxSeries int, mergeFirstLast bool) (promql_parser.Value, error) {
vec := promql.Vector{}
if next {
vec = r.SampleVector()
Expand All @@ -395,6 +426,14 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval
}

if GetRangeType(q.params) == InstantType {
// an instant query sharded first/last_over_time can return a single vector
if mergeFirstLast {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does TestMappingEquivalence_Instant need a test for the false case (existing behavior) here? Or is that adequately covered elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the tests here execute both the sharded and unsharded (aka not mergeFirstLast) versions of the queries, and then compare the results

series, _ := vectorsToSeries(vec)
result := promql.Matrix(series)
sort.Sort(result)
return result, stepEvaluator.Error()
}

sortByValue, err := Sortable(q.params)
if err != nil {
return nil, fmt.Errorf("fail to check Sortable, logql: %s ,err: %s", q.params.QueryString(), err)
Expand All @@ -410,28 +449,11 @@ func (q *query) JoinSampleVector(next bool, r StepResult, stepEvaluator StepEval
stepCount = 1
}

seriesIndex := map[uint64]*promql.Series{}
for next {
vec = r.SampleVector()
for _, p := range vec {
var (
series *promql.Series
hash = p.Metric.Hash()
ok bool
)

series, ok = seriesIndex[hash]
if !ok {
series = &promql.Series{
Metric: p.Metric,
Floats: make([]promql.FPoint, 0, stepCount),
}
seriesIndex[hash] = series
}
series.Floats = append(series.Floats, promql.FPoint{
T: p.T,
F: p.F,
})
}
_, seriesIndex = vectorsToSeries(vec)

// as we slowly build the full query for each steps, make sure we don't go over the limit of unique series.
if len(seriesIndex) > maxSeries {
return nil, logqlmodel.NewSeriesLimitError(maxSeries)
Expand Down
5 changes: 4 additions & 1 deletion pkg/logql/first_last_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ func (e *mergeOverTimeStepEvaluator) Next() (bool, int64, StepResult) {
// Merge other results
for i, m := range e.matrices {
for j, series := range m {

if len(series.Floats) == 0 || !e.inRange(series.Floats[0].T, ts) {
continue
}
Expand Down Expand Up @@ -171,6 +170,10 @@ func (e *mergeOverTimeStepEvaluator) pop(r, s int) {

// inRange returns true if t is in step range of ts.
func (e *mergeOverTimeStepEvaluator) inRange(t, ts int64) bool {
// special case instant queries
if e.step.Milliseconds() == 0 {
return true
}
return (ts-e.step.Milliseconds()) <= t && t < ts
}

Expand Down
1 change: 0 additions & 1 deletion pkg/logql/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ func (m MockDownstreamer) Downstream(ctx context.Context, queries []DownstreamQu
if err != nil {
return nil, err
}

results = append(results, res)
}

Expand Down
Loading