Skip to content

Commit

Permalink
Align metric query splits with step
Browse files Browse the repository at this point in the history
Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
  • Loading branch information
Danny Kopping committed Jan 10, 2024
1 parent 78b2c5a commit b7dfa09
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 92 deletions.
60 changes: 38 additions & 22 deletions pkg/querier/queryrange/split_by_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,24 @@ func Test_splitQuery(t *testing.T) {

splits, err := intervals.splitter.split(refTime, []string{tenantID}, req, intervals.splitInterval)
require.NoError(t, err)
require.Equal(t, want, splits)
//require.Equal(t, want, splits)

if !assert.Equal(t, want, splits) {
t.Logf("expected and actual do not match\n")
defer t.Fail()

if len(want) != len(splits) {
t.Logf("expected %d splits, got %d\n", len(want), len(splits))
return
}

for j := 0; j < len(want); j++ {
exp := want[j]
act := splits[j]
equal := assert.Equal(t, exp, act)
t.Logf("\t#%d [matches: %v]: expected %q/%q got %q/%q\n", j, equal, exp.GetStart(), exp.GetEnd(), act.GetStart(), act.GetEnd())
}
}
})
}
})
Expand Down Expand Up @@ -801,24 +818,23 @@ func Test_splitMetricQuery(t *testing.T) {
// query is wholly within ingester query window
{
input: &LokiRequest{
StartTs: refTime.Add(-time.Hour).Truncate(time.Second),
StartTs: refTime.Add(-time.Hour),
EndTs: refTime,
Step: 15 * seconds,
Query: shortRange,
},
expected: []queryrangebase.Request{
&LokiRequest{
StartTs: refTime.Add(-time.Hour).Truncate(time.Second),
EndTs: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC),
StartTs: time.Date(2023, 1, 15, 7, 05, 30, 0, time.UTC), // start time is aligned down to step of 15s
EndTs: time.Date(2023, 1, 15, 7, 29, 45, 0, time.UTC),
Step: 15 * seconds,
Query: shortRange,
},
&LokiRequest{
StartTs: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC),
// end time is aligned to step of 15s
EndTs: time.Date(2023, 1, 15, 8, 5, 45, 0, time.UTC),
Step: 15 * seconds,
Query: shortRange,
EndTs: time.Date(2023, 1, 15, 8, 5, 45, 0, time.UTC), // end time is aligned up to step of 15s
Step: 15 * seconds,
Query: shortRange,
},
},
splitInterval: time.Hour,
Expand All @@ -830,15 +846,15 @@ func Test_splitMetricQuery(t *testing.T) {
// query is partially within ingester query window
{
input: &LokiRequest{
StartTs: refTime.Add(-4 * time.Hour).Add(-30 * time.Minute).Truncate(time.Second),
StartTs: refTime.Add(-4 * time.Hour).Add(-30 * time.Minute),
EndTs: refTime,
Step: 15 * seconds,
Query: shortRange,
},
expected: []queryrangebase.Request{
// regular intervals until `query_ingesters_within` window
&LokiRequest{
StartTs: refTime.Add(-4 * time.Hour).Add(-30 * time.Minute).Truncate(time.Second),
StartTs: time.Date(2023, 1, 15, 3, 35, 30, 0, time.UTC),
EndTs: time.Date(2023, 1, 15, 3, 59, 45, 0, time.UTC),
Step: 15 * seconds,
Query: shortRange,
Expand All @@ -851,20 +867,20 @@ func Test_splitMetricQuery(t *testing.T) {
},
&LokiRequest{
StartTs: time.Date(2023, 1, 15, 5, 0, 0, 0, time.UTC),
EndTs: time.Date(2023, 1, 15, 5, 5, 30, 123456789, time.UTC),
EndTs: time.Date(2023, 1, 15, 5, 5, 30, 0, time.UTC),
Step: 15 * seconds,
Query: shortRange,
},
// and then different intervals for queries to ingesters
&LokiRequest{
StartTs: time.Date(2023, 1, 15, 5, 5, 30, 123456789, time.UTC),
EndTs: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC),
StartTs: time.Date(2023, 1, 15, 5, 5, 30, 0, time.UTC),
EndTs: time.Date(2023, 1, 15, 5, 59, 45, 0, time.UTC),
Step: 15 * seconds,
Query: shortRange,
},
&LokiRequest{
StartTs: time.Date(2023, 1, 15, 6, 0, 0, 0, time.UTC),
EndTs: time.Date(2023, 1, 15, 7, 30, 0, 0, time.UTC),
EndTs: time.Date(2023, 1, 15, 7, 29, 45, 0, time.UTC),
Step: 15 * seconds,
Query: shortRange,
},
Expand All @@ -884,22 +900,22 @@ func Test_splitMetricQuery(t *testing.T) {
// not within ingester query window
{
input: &LokiRequest{
StartTs: refTime.Add(-5 * time.Hour).Truncate(time.Second),
EndTs: refTime.Add(-4 * time.Hour).Truncate(time.Second),
StartTs: refTime.Add(-5 * time.Hour),
EndTs: refTime.Add(-4 * time.Hour),
Step: 15 * seconds,
Query: shortRange,
},
expected: []queryrangebase.Request{
// regular intervals until `query_ingesters_within` window
&LokiRequest{
StartTs: refTime.Add(-5 * time.Hour).Truncate(time.Second),
StartTs: time.Date(2023, 1, 15, 3, 5, 30, 0, time.UTC),
EndTs: time.Date(2023, 1, 15, 3, 59, 45, 0, time.UTC),
Step: 15 * seconds,
Query: shortRange,
},
&LokiRequest{
StartTs: time.Date(2023, 1, 15, 4, 0, 0, 0, time.UTC),
EndTs: time.Date(2023, 1, 15, 4, 5, 30, 0, time.UTC),
EndTs: time.Date(2023, 1, 15, 4, 5, 45, 0, time.UTC),
Step: 15 * seconds,
Query: shortRange,
},
Expand All @@ -913,15 +929,15 @@ func Test_splitMetricQuery(t *testing.T) {
// ingester query split by disabled
{
input: &LokiRequest{
StartTs: refTime.Add(-4 * time.Hour).Truncate(time.Second),
StartTs: refTime.Add(-4 * time.Hour),
EndTs: refTime,
Step: 15 * seconds,
Query: shortRange,
},
expected: []queryrangebase.Request{
// regular intervals only, since ingester split duration is 0
&LokiRequest{
StartTs: refTime.Add(-4 * time.Hour).Truncate(time.Second),
StartTs: time.Date(2023, 1, 15, 4, 5, 30, 0, time.UTC),
EndTs: time.Date(2023, 1, 15, 4, 59, 45, 0, time.UTC),
Step: 15 * seconds,
Query: shortRange,
Expand Down Expand Up @@ -960,15 +976,15 @@ func Test_splitMetricQuery(t *testing.T) {
// ingester query split by enabled, but query_store_only is enabled too
{
input: &LokiRequest{
StartTs: refTime.Add(-4 * time.Hour).Truncate(time.Second),
StartTs: refTime.Add(-4 * time.Hour),
EndTs: refTime,
Step: 15 * seconds,
Query: shortRange,
},
expected: []queryrangebase.Request{
// regular intervals only, since ingester split duration is 0
&LokiRequest{
StartTs: refTime.Add(-4 * time.Hour).Truncate(time.Second),
StartTs: time.Date(2023, 1, 15, 4, 5, 30, 0, time.UTC),
EndTs: time.Date(2023, 1, 15, 4, 59, 45, 0, time.UTC),
Step: 15 * seconds,
Query: shortRange,
Expand Down
163 changes: 93 additions & 70 deletions pkg/querier/queryrange/splitters.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,43 @@ func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req quer
return nil, nil
}

// Treat range of given query within the `query_ingesters_within` window differently by splitting using the `split_ingester_queries_by_interval`
// instead of the default `split_queries_by_interval`; rebound the start/end time after doing so to build intervals
// for queries outside the `query_ingesters_within` window.
//
// The given factory is responsible for building the splits and appending to reqs.
start, end := buildIngesterQuerySplitsAndRebound(execTime, s.limits, s.iqo, tenantIDs, req, factory, endTimeInclusive)
if start.Equal(end) {
// Nothing to do, there are no more intervals to process.
return reqs, nil
}
var (
ingesterSplits []queryrangebase.Request
origStart = req.GetStart().UTC()
origEnd = req.GetEnd().UTC()
)

start, end, needsIngesterSplits := ingesterQueryBounds(execTime, s.iqo, req)

if ingesterQueryInterval := validation.MaxDurationPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits {
// perform splitting using special interval (`split_ingester_queries_by_interval`)
util.ForInterval(ingesterQueryInterval, start, end, endTimeInclusive, factory)

// copy the splits, reset the results
ingesterSplits := reqs
reqs = nil
// rebound after ingester queries have been split out
end = start
start = req.GetStart().UTC()
if endTimeInclusive {
end = end.Add(-util.SplitGap)
}

// query only overlaps ingester query window, nothing more to do
if start.After(end) || start.Equal(end) {
return reqs, nil
}

util.ForInterval(interval, start, end, endTimeInclusive, factory)
// copy the splits, reset the results
ingesterSplits = reqs
reqs = nil
} else {
start = origStart
end = origEnd
}

// perform splitting over the rest of the time range
util.ForInterval(interval, origStart, end, endTimeInclusive, factory)

// move the ingester splits to the end to maintain correct order
reqs = append(reqs, ingesterSplits...)

return reqs, nil
}

Expand Down Expand Up @@ -158,16 +175,7 @@ func (s *metricQuerySplitter) split(execTime time.Time, tenantIDs []string, r qu
return nil, err
}

// step align start and end time of the query. Start time is rounded down and end time is rounded up.
stepNs := r.GetStep() * 1e6
startNs := lokiReq.StartTs.UnixNano()
start := time.Unix(0, startNs-startNs%stepNs)

endNs := lokiReq.EndTs.UnixNano()
if mod := endNs % stepNs; mod != 0 {
endNs += stepNs - mod
}
end := time.Unix(0, endNs)
start, end := s.alignStartEnd(r.GetStep(), lokiReq.StartTs, lokiReq.EndTs)

lokiReq = lokiReq.WithStartEnd(start, end).(*LokiRequest)

Expand All @@ -186,55 +194,85 @@ func (s *metricQuerySplitter) split(execTime time.Time, tenantIDs []string, r qu
}

// step is >= configured split interval, let us just split the query interval by step
// TODO this is likely buggy when step >= query range, how should we handle this?
if lokiReq.Step >= interval.Milliseconds() {
util.ForInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, factory)

return reqs, nil
}

// Treat range of given query within the `query_ingesters_within` window differently by splitting using the `split_ingester_queries_by_interval`
// instead of the default `split_queries_by_interval`; rebound the start/end time after doing so to build intervals
// for queries outside the `query_ingesters_within` window.
//
// The given factory is responsible for building the splits and appending to reqs.
newStart, newEnd := buildIngesterQuerySplitsAndRebound(execTime, s.limits, s.iqo, tenantIDs, lokiReq, factory, false)
if newStart.Equal(newEnd) {
// Nothing to do, there are no more intervals to process.
return reqs, nil
}
var (
ingesterSplits []queryrangebase.Request
needsIngesterSplits bool
)

origStart := start
origEnd := end

start, end, needsIngesterSplits = ingesterQueryBounds(execTime, s.iqo, lokiReq)
start, end = s.alignStartEnd(r.GetStep(), start, end)

if ingesterQueryInterval := validation.MaxDurationPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits {
// perform splitting using special interval (`split_ingester_queries_by_interval`)
s.buildMetricSplits(lokiReq.GetStep(), ingesterQueryInterval, start, end, factory)

// copy the splits, reset the results
ingesterSplits := reqs
reqs = nil
// rebound after ingester queries have been split out
end = start
start = origStart

for start := newStart; start.Before(newEnd); start = s.nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) {
end := s.nextIntervalBoundary(start, r.GetStep(), interval)
if end.Add(time.Duration(r.GetStep())*time.Millisecond).After(newEnd) || end.Add(time.Duration(r.GetStep())*time.Millisecond) == newEnd {
end = newEnd
// query only overlaps ingester query window, nothing more to do
if start.After(end) || start.Equal(end) {
return reqs, nil
}
factory(start, end)

// copy the splits, reset the results
ingesterSplits = reqs
reqs = nil
} else {
start = origStart
end = origEnd
}

// perform splitting over the rest of the time range
s.buildMetricSplits(lokiReq.GetStep(), interval, start, end, factory)

// move the ingester splits to the end to maintain correct order
reqs = append(reqs, ingesterSplits...)

return reqs, nil
}

// buildIngesterQuerySplitsAndRebound creates subqueries for the given request if it spans the `query_ingesters_within` window.
// It returns the new start & end times which exclude this window.
func buildIngesterQuerySplitsAndRebound(execTime time.Time, limits Limits, iqo util.IngesterQueryOptions, tenantIDs []string, req queryrangebase.Request, factory func(start time.Time, end time.Time), endTimeInclusive bool) (time.Time, time.Time) {
func (s *metricQuerySplitter) alignStartEnd(step int64, start, end time.Time) (time.Time, time.Time) {
// step align start and end time of the query. Start time is rounded down and end time is rounded up.
stepNs := step * 1e6
startNs := start.UnixNano()

endNs := end.UnixNano()
if mod := endNs % stepNs; mod != 0 {
endNs += stepNs - mod
}

return time.Unix(0, startNs-startNs%stepNs), time.Unix(0, endNs)
}

func (s *metricQuerySplitter) buildMetricSplits(step int64, interval time.Duration, start, end time.Time, factory func(start, end time.Time)) {
for splStart := start; splStart.Before(end); splStart = s.nextIntervalBoundary(splStart, step, interval).Add(time.Duration(step) * time.Millisecond) {
splEnd := s.nextIntervalBoundary(splStart, step, interval)
if splEnd.Add(time.Duration(step)*time.Millisecond).After(end) || splEnd.Add(time.Duration(step)*time.Millisecond) == end {
splEnd = end
}
factory(splStart, splEnd)
}
}

// ingesterQueryBounds determines if we need to split time ranges overlapping the ingester query window (`query_ingesters_within`)
// and retrieve the bounds for those specific splits
func ingesterQueryBounds(execTime time.Time, iqo util.IngesterQueryOptions, req queryrangebase.Request) (time.Time, time.Time, bool) {
start, end := req.GetStart().UTC(), req.GetEnd().UTC()

// ingesters are not queried, nothing to do
if iqo == nil || iqo.QueryStoreOnly() {
return start, end
}

interval := validation.MaxDurationPerTenant(tenantIDs, limits.IngesterQuerySplitDuration)
if interval == 0 {
// defer to normal split interval, leave start/end times unchanged
return start, end
return start, end, false
}

windowSize := iqo.QueryIngestersWithin()
Expand All @@ -247,23 +285,8 @@ func buildIngesterQuerySplitsAndRebound(execTime time.Time, limits Limits, iqo u

// query range does not overlap with ingester query window, nothing to do
if end.Before(ingesterWindow) {
return start, end
return start, end, false
}

newStart := start
newEnd := ingesterWindow

if endTimeInclusive {
newEnd = newEnd.Add(-util.SplitGap)
}

if start.After(newEnd) {
// query is fully within the ingester query window
newStart = newEnd
}

// build intervals using time range within ingester query window
util.ForInterval(interval, ingesterWindow, end, endTimeInclusive, factory)

return newStart, newEnd
return ingesterWindow, end, true
}

0 comments on commit b7dfa09

Please sign in to comment.