Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query frontend #3

Merged
merged 13 commits into from
Jan 6, 2020
152 changes: 78 additions & 74 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package queryrange

import (
"bytes"
"container/heap"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -210,91 +211,100 @@ func (codec) MergeResponse(responses ...queryrange.Response) (queryrange.Respons
Error: lokiRes.Error,
Data: LokiData{
ResultType: loghttp.ResultTypeStream,
Result: mergeStreams(lokiResponses, lokiRes.Limit, lokiRes.Direction),
Result: mergeOrderedNonOverlappingStreams(lokiResponses, lokiRes.Limit, lokiRes.Direction),
},
}, nil
}

type entry struct {
entry logproto.Entry
labels string
}
// mergeOrderedNonOverlappingStreams merges a set of ordered, nonoverlapping responses by concatenating matching streams then running them through a heap to pull out limit values
func mergeOrderedNonOverlappingStreams(resps []*LokiResponse, limit uint32, direction logproto.Direction) []logproto.Stream {

type byDirection struct {
direction logproto.Direction
entries []entry
}
var total int

// turn resps -> map[labels] []entries
groups := make(map[string]*byDir)
for _, resp := range resps {
for _, stream := range resp.Data.Result {
s, ok := groups[stream.Labels]
if !ok {
s = &byDir{
direction: direction,
labels: stream.Labels,
}
groups[stream.Labels] = s
}

s.markers = append(s.markers, marker(stream.Entries))
total += len(stream.Entries)
}

func (a byDirection) Len() int { return len(a.entries) }
func (a byDirection) Swap(i, j int) { a.entries[i], a.entries[j] = a.entries[j], a.entries[i] }
func (a byDirection) Less(i, j int) bool {
e1, e2 := a.entries[i], a.entries[j]
if a.direction == logproto.BACKWARD {
switch {
case e1.entry.Timestamp.UnixNano() < e2.entry.Timestamp.UnixNano():
return false
case e1.entry.Timestamp.UnixNano() > e2.entry.Timestamp.UnixNano():
return true
default:
return e1.labels > e2.labels
// optimization: since limit has been reached, no need to append entries from subsequent responses
if limit != 0 && total >= int(limit) {
break
}
}
switch {
case e1.entry.Timestamp.UnixNano() < e2.entry.Timestamp.UnixNano():
return true
case e1.entry.Timestamp.UnixNano() > e2.entry.Timestamp.UnixNano():
return false
default:
return e1.labels < e2.labels

keys := make([]string, 0, len(groups))
for key := range groups {
keys = append(keys, key)
}
sort.Strings(keys)

// escape hatch, can just return all the streams
if total <= int(limit) {
results := make([]logproto.Stream, 0, len(keys))
for _, key := range keys {
results = append(results, logproto.Stream{
Labels: key,
Entries: groups[key].merge(),
})
}
return results
}
}

func mergeStreams(resps []*LokiResponse, limit uint32, direction logproto.Direction) []logproto.Stream {
output := byDirection{
pq := &priorityqueue{
direction: direction,
entries: []entry{},
}
for _, resp := range resps {
for _, stream := range resp.Data.Result {
for _, e := range stream.Entries {
output.entries = append(output.entries, entry{
entry: e,
labels: stream.Labels,
})
}

for _, key := range keys {
stream := &logproto.Stream{
Labels: key,
Entries: groups[key].merge(),
}
if len(stream.Entries) > 0 {
pq.streams = append(pq.streams, stream)
}
}
sort.Sort(output)
// limit result
if len(output.entries) >= int(limit) {
output.entries = output.entries[:limit]
}

resultDict := map[string]*logproto.Stream{}
for _, e := range output.entries {
stream, ok := resultDict[e.labels]
heap.Init(pq)

resultDict := make(map[string]*logproto.Stream)

for i := 0; i < int(limit) && pq.Len() > 0; i++ {
next := heap.Pop(pq).(*logproto.Stream)

s, ok := resultDict[next.Labels]
if !ok {
stream = &logproto.Stream{
Labels: e.labels,
Entries: []logproto.Entry{},
s = &logproto.Stream{
Labels: next.Labels,
Entries: make([]logproto.Entry, 0, int(limit)/len(keys)), // allocation hack -- assume uniform distribution across labels
}
resultDict[e.labels] = stream
resultDict[next.Labels] = s
}
stream.Entries = append(stream.Entries, e.entry)

}
keys := make([]string, 0, len(resultDict))
for key := range resultDict {
keys = append(keys, key)
// TODO: make allocation friendly
s.Entries = append(s.Entries, next.Entries...)
}
sort.Strings(keys)

result := make([]logproto.Stream, 0, len(resultDict))
results := make([]logproto.Stream, 0, len(resultDict))
for _, key := range keys {
result = append(result, *resultDict[key])
stream, ok := resultDict[key]
if ok {
results = append(results, *stream)
}
}

return result
return results

}

func toProto(m loghttp.Matrix) []queryrange.SampleStream {
Expand All @@ -318,17 +328,11 @@ func toProto(m loghttp.Matrix) []queryrange.SampleStream {
return res
}

func (res LokiResponse) isFull() bool {
return countEntries(res.Data.Result) >= int64(res.Limit)
}

func countEntries(streams []logproto.Stream) int64 {
if len(streams) == 0 {
return 0
}
res := int64(0)
for _, s := range streams {
res += int64(len(s.Entries))
func (res LokiResponse) Count() int64 {
var result int64
for _, s := range res.Data.Result {
result += int64(len(s.Entries))
}
return res
return result

}
98 changes: 84 additions & 14 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,15 @@ func Test_codec_MergeResponse(t *testing.T) {
{
Labels: `{foo="bar", level="error"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 1), Line: "1"},
},
},
{
Labels: `{foo="bar", level="debug"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 5), Line: "5"},
{Timestamp: time.Unix(0, 6), Line: "6"},
{Timestamp: time.Unix(0, 5), Line: "5"},
},
},
},
Expand All @@ -291,8 +291,8 @@ func Test_codec_MergeResponse(t *testing.T) {
{
Labels: `{foo="bar", level="debug"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 15), Line: "15"},
{Timestamp: time.Unix(0, 16), Line: "16"},
{Timestamp: time.Unix(0, 15), Line: "15"},
},
},
},
Expand Down Expand Up @@ -345,15 +345,16 @@ func Test_codec_MergeResponse(t *testing.T) {
{
Labels: `{foo="bar", level="error"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 1), Line: "1"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 10), Line: "10"},
{Timestamp: time.Unix(0, 9), Line: "9"},
{Timestamp: time.Unix(0, 9), Line: "9"},
},
},
{
Labels: `{foo="bar", level="debug"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 5), Line: "5"},
{Timestamp: time.Unix(0, 6), Line: "6"},
{Timestamp: time.Unix(0, 16), Line: "16"},
{Timestamp: time.Unix(0, 15), Line: "15"},
},
},
},
Expand All @@ -370,16 +371,15 @@ func Test_codec_MergeResponse(t *testing.T) {
{
Labels: `{foo="bar", level="error"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 10), Line: "10"},
{Timestamp: time.Unix(0, 9), Line: "9"},
{Timestamp: time.Unix(0, 9), Line: "9"},
{Timestamp: time.Unix(0, 2), Line: "2"},
{Timestamp: time.Unix(0, 1), Line: "1"},
},
},
{
Labels: `{foo="bar", level="debug"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 15), Line: "15"},
{Timestamp: time.Unix(0, 16), Line: "16"},
{Timestamp: time.Unix(0, 6), Line: "6"},
{Timestamp: time.Unix(0, 5), Line: "5"},
},
},
},
Expand Down Expand Up @@ -452,8 +452,8 @@ func Test_codec_MergeResponse(t *testing.T) {
{
Labels: `{foo="bar", level="error"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 10), Line: "10"},
{Timestamp: time.Unix(0, 9), Line: "9"},
{Timestamp: time.Unix(0, 10), Line: "10"},
},
},
{
Expand Down Expand Up @@ -540,8 +540,8 @@ func Test_codec_MergeResponse(t *testing.T) {
{
Labels: `{foo="bar", level="error"}`,
Entries: []logproto.Entry{
{Timestamp: time.Unix(0, 10), Line: "10"},
{Timestamp: time.Unix(0, 9), Line: "9"},
{Timestamp: time.Unix(0, 10), Line: "10"},
},
},
{
Expand Down Expand Up @@ -686,3 +686,73 @@ var (
},
}
)

func BenchmarkResponseMerge(b *testing.B) {
const (
resps = 10
streams = 100
logsPerStream = 1000
)

for _, tc := range []struct {
desc string
limit uint32
fn func([]*LokiResponse, uint32, logproto.Direction) []logproto.Stream
}{
{
"mergeStreams unlimited",
uint32(streams * logsPerStream),
mergeStreams,
},
{
"mergeOrderedNonOverlappingStreams unlimited",
uint32(streams * logsPerStream),
mergeOrderedNonOverlappingStreams,
},
{
"mergeStreams limited",
uint32(streams*logsPerStream - 1),
mergeStreams,
},
{
"mergeOrderedNonOverlappingStreams limited",
uint32(streams*logsPerStream - 1),
mergeOrderedNonOverlappingStreams,
},
} {
input := mkResps(resps, streams, logsPerStream, logproto.FORWARD)
b.Run(tc.desc, func(b *testing.B) {
for n := 0; n < b.N; n++ {
tc.fn(input, tc.limit, logproto.FORWARD)
}
})
}

}

func mkResps(nResps, nStreams, nLogs int, direction logproto.Direction) (resps []*LokiResponse) {
for i := 0; i < nResps; i++ {
r := &LokiResponse{}
for j := 0; j < nStreams; j++ {
stream := logproto.Stream{
Labels: fmt.Sprintf(`{foo="%d"}`, j),
}
// split nLogs evenly across all responses
for k := i * (nLogs / nResps); k < (i+1)*(nLogs/nResps); k++ {
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Unix(int64(k), 0),
Line: fmt.Sprintf("%d", k),
})

if direction == logproto.BACKWARD {
for x, y := 0, len(stream.Entries)-1; x < len(stream.Entries)/2; x, y = x+1, y-1 {
stream.Entries[x], stream.Entries[y] = stream.Entries[y], stream.Entries[x]
}
}
}
r.Data.Result = append(r.Data.Result, stream)
}
resps = append(resps, r)
}
return resps
}
Loading