Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

align metric queries by step and other queries by split interval #5181

Merged
merged 4 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions pkg/querier/queryrange/split_by_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran

switch r := req.(type) {
case *LokiRequest:
forInterval(interval, r.StartTs, r.EndTs, func(start, end time.Time) {
forInterval(interval, r.StartTs, r.EndTs, false, func(start, end time.Time) {
reqs = append(reqs, &LokiRequest{
Query: r.Query,
Limit: r.Limit,
Expand All @@ -238,7 +238,7 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran
})
})
case *LokiSeriesRequest:
forInterval(interval, r.StartTs, r.EndTs, func(start, end time.Time) {
forInterval(interval, r.StartTs, r.EndTs, true, func(start, end time.Time) {
reqs = append(reqs, &LokiSeriesRequest{
Match: r.Match,
Path: r.Path,
Expand All @@ -248,7 +248,7 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran
})
})
case *LokiLabelNamesRequest:
forInterval(interval, r.StartTs, r.EndTs, func(start, end time.Time) {
forInterval(interval, r.StartTs, r.EndTs, true, func(start, end time.Time) {
reqs = append(reqs, &LokiLabelNamesRequest{
Path: r.Path,
StartTs: start,
Expand All @@ -261,11 +261,30 @@ func splitByTime(req queryrangebase.Request, interval time.Duration) ([]queryran
return reqs, nil
}

func forInterval(interval time.Duration, start, end time.Time, callback func(start, end time.Time)) {
// forInterval splits the given start and end time into given interval.
// When endTimeInclusive is true, it would keep a gap of 1ms between the splits.
// The only queries that have both start and end time inclusive are metadata queries,
// and without keeping a gap, we would end up querying duplicate data in adjacent queries.
func forInterval(interval time.Duration, start, end time.Time, endTimeInclusive bool, callback func(start, end time.Time)) {
// align the start time by split interval for better query performance of metadata queries and
// better cache-ability of query types that are cached.
ogStart := start
startNs := start.UnixNano()
start = time.Unix(0, startNs-startNs%interval.Nanoseconds())
firstInterval := true

for start := start; start.Before(end); start = start.Add(interval) {
newEnd := start.Add(interval)
if newEnd.After(end) {
if !newEnd.Before(end) {
newEnd = end
} else if endTimeInclusive {
newEnd = newEnd.Add(-time.Millisecond)
}
Comment on lines +278 to +282
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the end time is inclusive, we do not want the current query's end time to be the same as the start time of the next query. The only queries that have both start and end time inclusive are metadata queries, and without this change, we would still end up querying the same data as the next query.
I will add a comment to make it clear.


if firstInterval {
callback(ogStart, newEnd)
firstInterval = false
continue
}
callback(start, newEnd)
}
Expand Down Expand Up @@ -310,7 +329,7 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer
lokiReq := r.(*LokiRequest)
// step is >= configured split interval, let us just split the query interval by step
if lokiReq.Step >= interval.Milliseconds() {
forInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, func(start, end time.Time) {
forInterval(time.Duration(lokiReq.Step*1e6), lokiReq.StartTs, lokiReq.EndTs, false, func(start, end time.Time) {
reqs = append(reqs, &LokiRequest{
Query: lokiReq.Query,
Limit: lokiReq.Limit,
Expand All @@ -325,7 +344,12 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer
return reqs, nil
}

for start := lokiReq.StartTs; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) {
// nextIntervalBoundary always moves ahead in a multiple of steps but the time it returns would not be step aligned.
// To have step aligned intervals for better cache-ability of results, let us step align the start time which make all the split intervals step aligned.
startNs := lokiReq.StartTs.UnixNano()
start := time.Unix(0, startNs-startNs%(r.GetStep()*1e6))

for start := start; start.Before(lokiReq.EndTs); start = nextIntervalBoundary(start, r.GetStep(), interval).Add(time.Duration(r.GetStep()) * time.Millisecond) {
end := nextIntervalBoundary(start, r.GetStep(), interval)
if end.Add(time.Duration(r.GetStep())*time.Millisecond).After(lokiReq.EndTs) || end.Add(time.Duration(r.GetStep())*time.Millisecond) == lokiReq.EndTs {
end = lokiReq.EndTs
Expand All @@ -340,6 +364,11 @@ func splitMetricByTime(r queryrangebase.Request, interval time.Duration) ([]quer
EndTs: end,
})
}

if len(reqs) != 0 {
// change the start time to original time
reqs[0] = reqs[0].WithStartEnd(lokiReq.GetStart(), reqs[0].GetEnd())
}
return reqs, nil
}

Expand Down
265 changes: 196 additions & 69 deletions pkg/querier/queryrange/split_by_interval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,87 +21,152 @@ import (
var nilMetrics = NewSplitByMetrics(nil)

func Test_splitQuery(t *testing.T) {
tests := []struct {
name string
req queryrangebase.Request
interval time.Duration
want []queryrangebase.Request
buildLokiRequest := func(start, end time.Time) queryrangebase.Request {
return &LokiRequest{
Query: "foo",
Limit: 1,
Step: 2,
StartTs: start,
EndTs: end,
Direction: logproto.BACKWARD,
Path: "/path",
}
}

buildLokiSeriesRequest := func(start, end time.Time) queryrangebase.Request {
return &LokiSeriesRequest{
Match: []string{"match1"},
StartTs: start,
EndTs: end,
Path: "/series",
Shards: []string{"shard1"},
}
}

buildLokiLabelNamesRequest := func(start, end time.Time) queryrangebase.Request {
return &LokiLabelNamesRequest{
StartTs: start,
EndTs: end,
Path: "/labels",
}
}

type interval struct {
start, end time.Time
}
for requestType, tc := range map[string]struct {
requestBuilderFunc func(start, end time.Time) queryrangebase.Request
endTimeInclusive bool
}{
{
"smaller request than interval",
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 12, 30, 0, 0, time.UTC),
},
time.Hour,
[]queryrangebase.Request{
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 12, 30, 0, 0, time.UTC),
},
},
"LokiRequest": {
buildLokiRequest,
false,
},
{
"exactly 1 interval",
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 1, 0, 0, time.UTC),
EndTs: time.Date(2019, 12, 9, 13, 1, 0, 0, time.UTC),
},
time.Hour,
[]queryrangebase.Request{
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 1, 0, 0, time.UTC),
EndTs: time.Date(2019, 12, 9, 13, 1, 0, 0, time.UTC),
},
},
"LokiSeriesRequest": {
buildLokiSeriesRequest,
true,
},
{
"2 intervals",
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 13, 0, 0, 2, time.UTC),
"LokiLabelNamesRequest": {
buildLokiLabelNamesRequest,
true,
},
} {
expectedSplitGap := time.Duration(0)
if tc.endTimeInclusive {
expectedSplitGap = time.Millisecond
}
for name, intervals := range map[string]struct {
inp interval
expected []interval
}{
"no_change": {
inp: interval{
start: time.Unix(0, 0),
end: time.Unix(0, (1 * time.Hour).Nanoseconds()),
},
expected: []interval{
{
start: time.Unix(0, 0),
end: time.Unix(0, (1 * time.Hour).Nanoseconds()),
},
},
},
time.Hour,
[]queryrangebase.Request{
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 13, 0, 0, 1, time.UTC),
"align_start": {
inp: interval{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (2 * time.Hour).Nanoseconds()),
},
&LokiRequest{
StartTs: time.Date(2019, 12, 9, 13, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 13, 0, 0, 2, time.UTC),
expected: []interval{
{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap),
},
{
start: time.Unix(0, (1 * time.Hour).Nanoseconds()),
end: time.Unix(0, (2 * time.Hour).Nanoseconds()),
},
},
},
},
{
"3 intervals series",
&LokiSeriesRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 16, 0, 0, 2, time.UTC),
"align_end": {
inp: interval{
start: time.Unix(0, 0),
end: time.Unix(0, (115 * time.Minute).Nanoseconds()),
},
expected: []interval{
{
start: time.Unix(0, 0),
end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap),
},
{
start: time.Unix(0, (1 * time.Hour).Nanoseconds()),
end: time.Unix(0, (115 * time.Minute).Nanoseconds()),
},
},
},
2 * time.Hour,
[]queryrangebase.Request{
&LokiSeriesRequest{
StartTs: time.Date(2019, 12, 9, 12, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 14, 0, 0, 1, time.UTC),
"align_both": {
inp: interval{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (175 * time.Minute).Nanoseconds()),
},
expected: []interval{
{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (1 * time.Hour).Nanoseconds()).Add(-expectedSplitGap),
},
{
start: time.Unix(0, (1 * time.Hour).Nanoseconds()),
end: time.Unix(0, (2 * time.Hour).Nanoseconds()).Add(-expectedSplitGap),
},
{
start: time.Unix(0, (2 * time.Hour).Nanoseconds()),
end: time.Unix(0, (175 * time.Minute).Nanoseconds()),
},
},
&LokiSeriesRequest{
StartTs: time.Date(2019, 12, 9, 14, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 16, 0, 0, 1, time.UTC),
},
"no_align": {
inp: interval{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (55 * time.Minute).Nanoseconds()),
},
&LokiSeriesRequest{
StartTs: time.Date(2019, 12, 9, 16, 0, 0, 1, time.UTC),
EndTs: time.Date(2019, 12, 9, 16, 0, 0, 2, time.UTC),
expected: []interval{
{
start: time.Unix(0, (5 * time.Minute).Nanoseconds()),
end: time.Unix(0, (55 * time.Minute).Nanoseconds()),
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := splitByTime(tt.req, tt.interval)
require.NoError(t, err)
require.Equal(t, tt.want, got)
})
} {
t.Run(fmt.Sprintf("%s - %s", name, requestType), func(t *testing.T) {
inp := tc.requestBuilderFunc(intervals.inp.start, intervals.inp.end)
var want []queryrangebase.Request
for _, interval := range intervals.expected {
want = append(want, tc.requestBuilderFunc(interval.start, interval.end))
}
splits, err := splitByTime(inp, time.Hour)
require.NoError(t, err)
require.Equal(t, want, splits)
})
}
}
}

Expand Down Expand Up @@ -287,6 +352,68 @@ func Test_splitMetricQuery(t *testing.T) {
interval: 3 * time.Hour,
},

// step not a multiple of interval
// start time already step aligned
{
input: &LokiRequest{
StartTs: time.Unix(2*3600-9, 0), // 2h mod 17s = 9s
EndTs: time.Unix(3*3*3600, 0),
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
expected: []queryrangebase.Request{
&LokiRequest{
StartTs: time.Unix(2*3600-9, 0),
EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
&LokiRequest{
StartTs: time.Unix((3*3600)+12, 0),
EndTs: time.Unix((2*3*3600)-10, 0), // 6h mod 17s = 10s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
&LokiRequest{
StartTs: time.Unix(2*3*3600+7, 0),
EndTs: time.Unix(3*3*3600, 0),
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
},
interval: 3 * time.Hour,
},
// start time not aligned with step
{
input: &LokiRequest{
StartTs: time.Unix(2*3600, 0),
EndTs: time.Unix(3*3*3600, 0),
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
expected: []queryrangebase.Request{
&LokiRequest{
StartTs: time.Unix(2*3600, 0),
EndTs: time.Unix((3*3600)-5, 0), // 3h mod 17s = 5s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
&LokiRequest{
StartTs: time.Unix((3*3600)+12, 0),
EndTs: time.Unix((2*3*3600)-10, 0), // 6h mod 17s = 10s
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
&LokiRequest{
StartTs: time.Unix(2*3*3600+7, 0),
EndTs: time.Unix(3*3*3600, 0),
Step: 17 * seconds,
Query: `rate({app="foo"}[1m])`,
},
},
interval: 3 * time.Hour,
},

// step larger than split interval
{
input: &LokiRequest{
Expand Down