diff --git a/CHANGELOG.md b/CHANGELOG.md index a9d5366e766e2..272cc593fa5e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## Main +* [5289](https://github.com/grafana/loki/pull/5289) **ctovena**: Fix deduplication bug in queries when mutating labels. * [5302](https://github.com/grafana/loki/pull/5302) **MasslessParticle** Update azure blobstore client to use new sdk. * [5243](https://github.com/grafana/loki/pull/5290) **ssncferreira**: Update Promtail to support duration string formats. * [5266](https://github.com/grafana/loki/pull/5266) **jeschkies**: Write Promtail position file atomically on Unix. diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index a67545de90876..3a0e45837bfa2 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -153,6 +153,10 @@ func (i *dumbChunkIterator) Labels() string { return "" } +func (i *dumbChunkIterator) StreamHash() uint64 { + return 0 +} + func (i *dumbChunkIterator) Error() error { return nil } diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 97405365713cc..1c548d73c02a0 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -816,7 +816,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi var it iter.EntryIterator if ordered { - it = iter.NewNonOverlappingIterator(blockItrs, "") + it = iter.NewNonOverlappingIterator(blockItrs) } else { it = iter.NewSortEntryIterator(blockItrs, direction) } @@ -849,7 +849,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi } if ordered { - return iter.NewNonOverlappingIterator(blockItrs, ""), nil + return iter.NewNonOverlappingIterator(blockItrs), nil } return iter.NewSortEntryIterator(blockItrs, direction), nil } @@ -884,7 +884,7 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, var it iter.SampleIterator if ordered { - it = iter.NewNonOverlappingSampleIterator(its, "") + it = iter.NewNonOverlappingSampleIterator(its) } else { it = iter.NewSortSampleIterator(its) } @@ -1252,6 +1252,8 @@ func (e *entryBufferedIterator) Entry() logproto.Entry { func (e *entryBufferedIterator) Labels() string { return e.currLabels.String() } +func (e *entryBufferedIterator) StreamHash() uint64 { return e.pipeline.BaseLabels().Hash() } + func (e *entryBufferedIterator) Next() bool { for e.bufferedIterator.Next() { newLine, lbs, ok := e.pipeline.Process(e.currLine) @@ -1299,6 +1301,8 @@ func (e *sampleBufferedIterator) Next() bool { } func (e *sampleBufferedIterator) Labels() string { return e.currLabels.String() } +func (e *sampleBufferedIterator) StreamHash() uint64 { return e.extractor.BaseLabels().Hash() } + func (e *sampleBufferedIterator) Sample() logproto.Sample { return e.cur } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index af4cf2479064d..8a4828fb516c0 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -677,6 +677,7 @@ func BenchmarkWrite(b *testing.B) { type nomatchPipeline struct{} +func (nomatchPipeline) BaseLabels() log.LabelsResult { return log.EmptyLabelsResult } func (nomatchPipeline) Process(line []byte) ([]byte, log.LabelsResult, bool) { return line, nil, false } func (nomatchPipeline) ProcessString(line string) (string, log.LabelsResult, bool) { return line, nil, false diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 87f1d13bfdb26..99d3a563b8900 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -468,7 +468,7 @@ func (s *stream) Iterator(ctx context.Context, statsCtx *stats.Context, from, th } if ordered { - return iter.NewNonOverlappingIterator(iterators, ""), nil + return iter.NewNonOverlappingIterator(iterators), nil } return iter.NewSortEntryIterator(iterators, direction), nil } @@ -505,7 +505,7 @@ func (s *stream) SampleIterator(ctx context.Context, statsCtx *stats.Context, fr } if ordered { - return iter.NewNonOverlappingSampleIterator(iterators, ""), nil + return iter.NewNonOverlappingSampleIterator(iterators), nil } return iter.NewSortSampleIterator(iterators), nil } diff --git a/pkg/iter/cache.go b/pkg/iter/cache.go index 7e8bf5e840566..60e242cbe0827 100644 --- a/pkg/iter/cache.go +++ b/pkg/iter/cache.go @@ -53,7 +53,7 @@ func (it *cachedIterator) consumeWrapped() bool { return false } // we're caching entries - it.cache = append(it.cache, entryWithLabels{entry: it.Wrapped().Entry(), labels: it.Wrapped().Labels()}) + it.cache = append(it.cache, entryWithLabels{entry: it.Wrapped().Entry(), labels: it.Wrapped().Labels(), streamHash: it.Wrapped().StreamHash()}) it.curr++ return true } @@ -87,6 +87,13 @@ func (it *cachedIterator) Labels() string { return it.cache[it.curr].labels } +func (it *cachedIterator) StreamHash() uint64 { + if len(it.cache) == 0 || it.curr < 0 || it.curr >= len(it.cache) { + return 0 + } + return it.cache[it.curr].streamHash +} + func (it *cachedIterator) Error() error { return it.iterErr } func (it *cachedIterator) Close() error { @@ -143,7 +150,7 @@ func (it *cachedSampleIterator) consumeWrapped() bool { return false } // we're caching entries - it.cache = append(it.cache, sampleWithLabels{Sample: it.Wrapped().Sample(), labels: it.Wrapped().Labels()}) + it.cache = append(it.cache, sampleWithLabels{Sample: it.Wrapped().Sample(), labels: it.Wrapped().Labels(), streamHash: it.Wrapped().StreamHash()}) it.curr++ return true } @@ -176,6 +183,13 @@ func (it *cachedSampleIterator) Labels() string { return it.cache[it.curr].labels } +func (it *cachedSampleIterator) StreamHash() uint64 { + if len(it.cache) == 0 || it.curr < 0 || it.curr >= len(it.cache) { + return 0 + } + return it.cache[it.curr].streamHash +} + func (it *cachedSampleIterator) Error() error { return it.iterErr } func (it *cachedSampleIterator) Close() error { diff --git a/pkg/iter/cache_test.go b/pkg/iter/cache_test.go index a786aa5b7a4b6..9befe34383538 100644 --- a/pkg/iter/cache_test.go +++ b/pkg/iter/cache_test.go @@ -251,6 +251,7 @@ type errorIter struct{} func (errorIter) Next() bool { return false } func (errorIter) Error() error { return errors.New("error") } func (errorIter) Labels() string { return "" } +func (errorIter) StreamHash() uint64 { return 0 } func (errorIter) Entry() logproto.Entry { return logproto.Entry{} } func (errorIter) Sample() logproto.Sample { return logproto.Sample{} } func (errorIter) Close() error { return errors.New("close") } diff --git a/pkg/iter/entry_iterator.go b/pkg/iter/entry_iterator.go index 3b8ec7d704c07..b93e03fa329e5 100644 --- a/pkg/iter/entry_iterator.go +++ b/pkg/iter/entry_iterator.go @@ -14,43 +14,27 @@ import ( // EntryIterator iterates over entries in time-order. type EntryIterator interface { - Next() bool + Iterator Entry() logproto.Entry - Labels() string - Error() error - Close() error } -type noOpIterator struct{} - -var NoopIterator = noOpIterator{} - -func (noOpIterator) Next() bool { return false } -func (noOpIterator) Error() error { return nil } -func (noOpIterator) Labels() string { return "" } -func (noOpIterator) Entry() logproto.Entry { return logproto.Entry{} } -func (noOpIterator) Sample() logproto.Sample { return logproto.Sample{} } -func (noOpIterator) Close() error { return nil } - // streamIterator iterates over entries in a stream. type streamIterator struct { - i int - entries []logproto.Entry - labels string + i int + stream logproto.Stream } // NewStreamIterator iterates over entries in a stream. func NewStreamIterator(stream logproto.Stream) EntryIterator { return &streamIterator{ - i: -1, - entries: stream.Entries, - labels: stream.Labels, + i: -1, + stream: stream, } } func (i *streamIterator) Next() bool { i.i++ - return i.i < len(i.entries) + return i.i < len(i.stream.Entries) } func (i *streamIterator) Error() error { @@ -58,11 +42,13 @@ func (i *streamIterator) Error() error { } func (i *streamIterator) Labels() string { - return i.labels + return i.stream.Labels } +func (i *streamIterator) StreamHash() uint64 { return i.stream.Hash } + func (i *streamIterator) Entry() logproto.Entry { - return i.entries[i.i] + return i.stream.Entries[i.i] } func (i *streamIterator) Close() error { @@ -86,44 +72,24 @@ func (h *iteratorHeap) Pop() interface{} { return x } -type iteratorMinHeap struct { +type iteratorSortHeap struct { iteratorHeap + byAlphabetical bool + byAscendingTime bool } -func (h iteratorMinHeap) Less(i, j int) bool { - t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp - - un1 := t1.UnixNano() - un2 := t2.UnixNano() - - switch { - case un1 < un2: - return true - case un1 > un2: - return false - default: // un1 == un2: - return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels() +func (h iteratorSortHeap) Less(i, j int) bool { + t1, t2 := h.iteratorHeap[i].Entry().Timestamp.UnixNano(), h.iteratorHeap[j].Entry().Timestamp.UnixNano() + if t1 == t2 { + if h.byAlphabetical { + return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels() + } + return h.iteratorHeap[i].StreamHash() < h.iteratorHeap[j].StreamHash() } -} - -type iteratorMaxHeap struct { - iteratorHeap -} - -func (h iteratorMaxHeap) Less(i, j int) bool { - t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp - - un1 := t1.UnixNano() - un2 := t2.UnixNano() - - switch { - case un1 < un2: - return false - case un1 > un2: - return true - default: // un1 == un2 - return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels() + if h.byAscendingTime { + return t1 < t2 } + return t1 > t2 } // HeapIterator iterates over a heap of iterators with ability to push new iterators and get some properties like time of entry at peek and len @@ -158,9 +124,9 @@ func NewMergeEntryIterator(ctx context.Context, is []EntryIterator, direction lo result := &mergeEntryIterator{is: is, stats: stats.FromContext(ctx)} switch direction { case logproto.BACKWARD: - result.heap = &iteratorMaxHeap{iteratorHeap: make([]EntryIterator, 0, len(is))} + result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: false} case logproto.FORWARD: - result.heap = &iteratorMinHeap{iteratorHeap: make([]EntryIterator, 0, len(is))} + result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: true} default: panic("bad direction") } @@ -225,6 +191,8 @@ func (i *mergeEntryIterator) Next() bool { if i.heap.Len() == 1 { i.currEntry.entry = i.heap.Peek().Entry() i.currEntry.labels = i.heap.Peek().Labels() + i.currEntry.streamHash = i.heap.Peek().StreamHash() + if !i.heap.Peek().Next() { i.heap.Pop() } @@ -238,7 +206,7 @@ func (i *mergeEntryIterator) Next() bool { for i.heap.Len() > 0 { next := i.heap.Peek() entry := next.Entry() - if len(i.tuples) > 0 && (i.tuples[0].Labels() != next.Labels() || !i.tuples[0].Timestamp.Equal(entry.Timestamp)) { + if len(i.tuples) > 0 && (i.tuples[0].StreamHash() != next.StreamHash() || !i.tuples[0].Timestamp.Equal(entry.Timestamp)) { break } @@ -253,6 +221,7 @@ func (i *mergeEntryIterator) Next() bool { if len(i.tuples) == 1 { i.currEntry.entry = i.tuples[0].Entry i.currEntry.labels = i.tuples[0].Labels() + i.currEntry.streamHash = i.tuples[0].StreamHash() i.requeue(i.tuples[0].EntryIterator, false) i.tuples = i.tuples[:0] return true @@ -263,6 +232,7 @@ func (i *mergeEntryIterator) Next() bool { t := i.tuples[0] i.currEntry.entry = t.Entry i.currEntry.labels = t.Labels() + i.currEntry.streamHash = i.tuples[0].StreamHash() // Requeue the iterators, advancing them if they were consumed. for j := range i.tuples { @@ -288,6 +258,8 @@ func (i *mergeEntryIterator) Labels() string { return i.currEntry.labels } +func (i *mergeEntryIterator) StreamHash() uint64 { return i.currEntry.streamHash } + func (i *mergeEntryIterator) Error() error { switch len(i.errs) { case 0: @@ -337,6 +309,7 @@ type entrySortIterator struct { // NewSortEntryIterator returns a new EntryIterator that sorts entries by timestamp (depending on the direction) the input iterators. // The iterator only order entries across given `is` iterators, it does not sort entries within individual iterator. // This means using this iterator with a single iterator will result in the same result as the input iterator. +// When timestamp is equal, the iterator sorts samples by their label alphabetically. func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) EntryIterator { if len(is) == 0 { return NoopIterator @@ -347,9 +320,9 @@ func NewSortEntryIterator(is []EntryIterator, direction logproto.Direction) Entr result := &entrySortIterator{is: is} switch direction { case logproto.BACKWARD: - result.heap = &iteratorMaxHeap{iteratorHeap: make([]EntryIterator, 0, len(is))} + result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: false, byAlphabetical: true} case logproto.FORWARD: - result.heap = &iteratorMinHeap{iteratorHeap: make([]EntryIterator, 0, len(is))} + result.heap = &iteratorSortHeap{iteratorHeap: make([]EntryIterator, 0, len(is)), byAscendingTime: true, byAlphabetical: true} default: panic("bad direction") } @@ -391,6 +364,7 @@ func (i *entrySortIterator) Next() bool { next := i.heap.Peek() i.currEntry.entry = next.Entry() i.currEntry.labels = next.Labels() + i.currEntry.streamHash = next.StreamHash() // if the top iterator is empty, we remove it. if !next.Next() { heap.Pop(i.heap) @@ -414,6 +388,10 @@ func (i *entrySortIterator) Labels() string { return i.currEntry.labels } +func (i *entrySortIterator) StreamHash() uint64 { + return i.currEntry.streamHash +} + func (i *entrySortIterator) Error() error { switch len(i.errs) { case 0: @@ -488,6 +466,8 @@ func (i *queryClientIterator) Labels() string { return i.curr.Labels() } +func (i *queryClientIterator) StreamHash() uint64 { return i.curr.StreamHash() } + func (i *queryClientIterator) Error() error { return i.err } @@ -497,15 +477,13 @@ func (i *queryClientIterator) Close() error { } type nonOverlappingIterator struct { - labels string iterators []EntryIterator curr EntryIterator } // NewNonOverlappingIterator gives a chained iterator over a list of iterators. -func NewNonOverlappingIterator(iterators []EntryIterator, labels string) EntryIterator { +func NewNonOverlappingIterator(iterators []EntryIterator) EntryIterator { return &nonOverlappingIterator{ - labels: labels, iterators: iterators, } } @@ -532,18 +510,24 @@ func (i *nonOverlappingIterator) Entry() logproto.Entry { } func (i *nonOverlappingIterator) Labels() string { - if i.labels != "" { - return i.labels + if i.curr == nil { + return "" } - return i.curr.Labels() } +func (i *nonOverlappingIterator) StreamHash() uint64 { + if i.curr == nil { + return 0 + } + return i.curr.StreamHash() +} + func (i *nonOverlappingIterator) Error() error { - if i.curr != nil { - return i.curr.Error() + if i.curr == nil { + return nil } - return nil + return i.curr.Error() } func (i *nonOverlappingIterator) Close() error { @@ -601,8 +585,9 @@ func (i *timeRangedIterator) Next() bool { } type entryWithLabels struct { - entry logproto.Entry - labels string + entry logproto.Entry + labels string + streamHash uint64 } type reverseIterator struct { @@ -638,7 +623,7 @@ func (i *reverseIterator) load() { if !i.loaded { i.loaded = true for count := uint32(0); (i.limit == 0 || count < i.limit) && i.iter.Next(); count++ { - i.entriesWithLabels = append(i.entriesWithLabels, entryWithLabels{i.iter.Entry(), i.iter.Labels()}) + i.entriesWithLabels = append(i.entriesWithLabels, entryWithLabels{i.iter.Entry(), i.iter.Labels(), i.iter.StreamHash()}) } i.iter.Close() } @@ -662,6 +647,10 @@ func (i *reverseIterator) Labels() string { return i.cur.labels } +func (i *reverseIterator) StreamHash() uint64 { + return i.cur.streamHash +} + func (i *reverseIterator) Error() error { return nil } func (i *reverseIterator) Close() error { @@ -709,7 +698,7 @@ func (i *reverseEntryIterator) load() { if !i.loaded { i.loaded = true for i.iter.Next() { - i.buf.entries = append(i.buf.entries, entryWithLabels{i.iter.Entry(), i.iter.Labels()}) + i.buf.entries = append(i.buf.entries, entryWithLabels{i.iter.Entry(), i.iter.Labels(), i.iter.StreamHash()}) } i.iter.Close() } @@ -734,6 +723,10 @@ func (i *reverseEntryIterator) Labels() string { return i.cur.labels } +func (i *reverseEntryIterator) StreamHash() uint64 { + return i.cur.streamHash +} + func (i *reverseEntryIterator) Error() error { return nil } func (i *reverseEntryIterator) Close() error { @@ -753,11 +746,12 @@ func ReadBatch(i EntryIterator, size uint32) (*logproto.QueryResponse, uint32, e streams := map[string]*logproto.Stream{} respSize := uint32(0) for ; respSize < size && i.Next(); respSize++ { - labels, entry := i.Labels(), i.Entry() + labels, hash, entry := i.Labels(), i.StreamHash(), i.Entry() stream, ok := streams[labels] if !ok { stream = &logproto.Stream{ Labels: labels, + Hash: hash, } streams[labels] = stream } @@ -794,8 +788,9 @@ func NewPeekingIterator(iter EntryIterator) PeekingEntryIterator { next := &entryWithLabels{} if iter.Next() { cache = &entryWithLabels{ - entry: iter.Entry(), - labels: iter.Labels(), + entry: iter.Entry(), + labels: iter.Labels(), + streamHash: iter.StreamHash(), } next.entry = cache.entry next.labels = cache.labels @@ -812,6 +807,7 @@ func (it *peekingEntryIterator) Next() bool { if it.cache != nil { it.next.entry = it.cache.entry it.next.labels = it.cache.labels + it.next.streamHash = it.cache.streamHash it.cacheNext() return true } @@ -823,6 +819,7 @@ func (it *peekingEntryIterator) cacheNext() { if it.iter.Next() { it.cache.entry = it.iter.Entry() it.cache.labels = it.iter.Labels() + it.cache.streamHash = it.iter.StreamHash() return } // nothing left removes the cached entry @@ -845,6 +842,13 @@ func (it *peekingEntryIterator) Labels() string { return "" } +func (it *peekingEntryIterator) StreamHash() uint64 { + if it.next != nil { + return it.next.streamHash + } + return 0 +} + // Entry implements `EntryIterator` func (it *peekingEntryIterator) Entry() logproto.Entry { if it.next != nil { diff --git a/pkg/iter/entry_iterator_test.go b/pkg/iter/entry_iterator_test.go index 3a563432d1cc8..3db309288763a 100644 --- a/pkg/iter/entry_iterator_test.go +++ b/pkg/iter/entry_iterator_test.go @@ -3,6 +3,7 @@ package iter import ( "context" "fmt" + "hash/fnv" "math/rand" "testing" "time" @@ -120,9 +121,9 @@ func TestIteratorMultipleLabels(t *testing.T) { length: testSize * 2, labels: func(i int64) string { if i%2 == 0 { - return "{foobar: \"baz1\"}" + return "{foobar: \"baz2\"}" } - return "{foobar: \"baz2\"}" + return "{foobar: \"baz1\"}" }, }, @@ -138,9 +139,9 @@ func TestIteratorMultipleLabels(t *testing.T) { length: testSize * 2, labels: func(i int64) string { if i/testSize == 0 { - return "{foobar: \"baz1\"}" + return "{foobar: \"baz2\"}" } - return "{foobar: \"baz2\"}" + return "{foobar: \"baz1\"}" }, }, } { @@ -202,9 +203,16 @@ func mkStreamIterator(f generator, labels string) EntryIterator { return NewStreamIterator(logproto.Stream{ Entries: entries, Labels: labels, + Hash: hashLabels(labels), }) } +func hashLabels(lbs string) uint64 { + h := fnv.New64a() + h.Write([]byte(lbs)) + return h.Sum64() +} + func identity(i int64) logproto.Entry { return logproto.Entry{ Timestamp: time.Unix(i, 0), @@ -237,6 +245,7 @@ func inverse(g generator) generator { func TestMergeIteratorDeduplication(t *testing.T) { foo := logproto.Stream{ Labels: `{app="foo"}`, + Hash: hashLabels(`{app="foo"}`), Entries: []logproto.Entry{ {Timestamp: time.Unix(0, 1), Line: "1"}, {Timestamp: time.Unix(0, 2), Line: "2"}, @@ -245,6 +254,7 @@ func TestMergeIteratorDeduplication(t *testing.T) { } bar := logproto.Stream{ Labels: `{app="bar"}`, + Hash: hashLabels(`{app="bar"}`), Entries: []logproto.Entry{ {Timestamp: time.Unix(0, 1), Line: "1"}, {Timestamp: time.Unix(0, 2), Line: "2"}, @@ -296,6 +306,54 @@ func TestMergeIteratorDeduplication(t *testing.T) { assertIt(it, true, len(foo.Entries)) } +func TestMergeIteratorWithoutLabels(t *testing.T) { + foo := logproto.Stream{ + Labels: ``, + Hash: hashLabels(`{app="foo"}`), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1"}, + {Timestamp: time.Unix(0, 2), Line: "2"}, + {Timestamp: time.Unix(0, 3), Line: "3"}, + }, + } + bar := logproto.Stream{ + Labels: `{some="other"}`, + Hash: hashLabels(`{app="bar"}`), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1"}, + {Timestamp: time.Unix(0, 2), Line: "2"}, + {Timestamp: time.Unix(0, 3), Line: "3"}, + }, + } + + // forward iteration + it := NewMergeEntryIterator(context.Background(), []EntryIterator{ + NewStreamIterator(foo), + NewStreamIterator(bar), + NewStreamIterator(foo), + NewStreamIterator(bar), + NewStreamIterator(foo), + NewStreamIterator(bar), + NewStreamIterator(foo), + }, logproto.FORWARD) + + for i := 0; i < 3; i++ { + + require.True(t, it.Next()) + require.NoError(t, it.Error()) + require.Equal(t, bar.Labels, it.Labels()) + require.Equal(t, bar.Entries[i], it.Entry()) + + require.True(t, it.Next()) + require.NoError(t, it.Error()) + require.Equal(t, foo.Labels, it.Labels()) + require.Equal(t, foo.Entries[i], it.Entry()) + + } + require.False(t, it.Next()) + require.NoError(t, it.Error()) +} + func mustReverseStreamIterator(it EntryIterator) EntryIterator { reversed, err := NewReversedIter(it, 0, true) if err != nil { @@ -615,6 +673,7 @@ type CloseTestingIterator struct { func (i *CloseTestingIterator) Next() bool { return true } func (i *CloseTestingIterator) Entry() logproto.Entry { return i.e } func (i *CloseTestingIterator) Labels() string { return "" } +func (i *CloseTestingIterator) StreamHash() uint64 { return 0 } func (i *CloseTestingIterator) Error() error { return nil } func (i *CloseTestingIterator) Close() error { i.closed.Store(true) @@ -623,7 +682,7 @@ func (i *CloseTestingIterator) Close() error { func TestNonOverlappingClose(t *testing.T) { a, b := &CloseTestingIterator{}, &CloseTestingIterator{} - itr := NewNonOverlappingIterator([]EntryIterator{a, b}, "") + itr := NewNonOverlappingIterator([]EntryIterator{a, b}) // Ensure both itr.cur and itr.iterators are non nil itr.Next() diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go new file mode 100644 index 0000000000000..8d0bde08ec652 --- /dev/null +++ b/pkg/iter/iterator.go @@ -0,0 +1,28 @@ +package iter + +import "github.com/grafana/loki/pkg/logproto" + +// Iterator iterates over data in time-order. +type Iterator interface { + // Returns true if there is more data to iterate. + Next() bool + // Labels returns the labels for the current entry. + // The labels can be mutated by the query engine and not reflect the original stream. + Labels() string + // StreamHash returns the hash of the original stream for the current entry. + StreamHash() uint64 + Error() error + Close() error +} + +type noOpIterator struct{} + +var NoopIterator = noOpIterator{} + +func (noOpIterator) Next() bool { return false } +func (noOpIterator) Error() error { return nil } +func (noOpIterator) Labels() string { return "" } +func (noOpIterator) StreamHash() uint64 { return 0 } +func (noOpIterator) Entry() logproto.Entry { return logproto.Entry{} } +func (noOpIterator) Sample() logproto.Sample { return logproto.Sample{} } +func (noOpIterator) Close() error { return nil } diff --git a/pkg/iter/sample_iterator.go b/pkg/iter/sample_iterator.go index 70145c23ac990..f21ef5cd21b10 100644 --- a/pkg/iter/sample_iterator.go +++ b/pkg/iter/sample_iterator.go @@ -13,13 +13,10 @@ import ( // SampleIterator iterates over samples in time-order. type SampleIterator interface { - Next() bool + Iterator // todo(ctovena) we should add `Seek(t int64) bool` // This way we can skip when ranging over samples. Sample() logproto.Sample - Labels() string - Error() error - Close() error } // PeekingSampleIterator is a sample iterator that can peek sample without moving the current sample. @@ -37,7 +34,8 @@ type peekingSampleIterator struct { type sampleWithLabels struct { logproto.Sample - labels string + labels string + streamHash uint64 } func NewPeekingSampleIterator(iter SampleIterator) PeekingSampleIterator { @@ -46,8 +44,9 @@ func NewPeekingSampleIterator(iter SampleIterator) PeekingSampleIterator { next := &sampleWithLabels{} if iter.Next() { cache = &sampleWithLabels{ - Sample: iter.Sample(), - labels: iter.Labels(), + Sample: iter.Sample(), + labels: iter.Labels(), + streamHash: iter.StreamHash(), } next.Sample = cache.Sample next.labels = cache.labels @@ -70,10 +69,18 @@ func (it *peekingSampleIterator) Labels() string { return "" } +func (it *peekingSampleIterator) StreamHash() uint64 { + if it.next != nil { + return it.next.streamHash + } + return 0 +} + func (it *peekingSampleIterator) Next() bool { if it.cache != nil { it.next.Sample = it.cache.Sample it.next.labels = it.cache.labels + it.next.streamHash = it.cache.streamHash it.cacheNext() return true } @@ -85,6 +92,7 @@ func (it *peekingSampleIterator) cacheNext() { if it.iter.Next() { it.cache.Sample = it.iter.Sample() it.cache.labels = it.iter.Labels() + it.cache.streamHash = it.iter.StreamHash() return } // nothing left removes the cached entry @@ -109,33 +117,34 @@ func (it *peekingSampleIterator) Error() error { return it.iter.Error() } -type sampleIteratorHeap []SampleIterator +type sampleIteratorHeap struct { + its []SampleIterator + byAlphabetical bool +} -func (h sampleIteratorHeap) Len() int { return len(h) } -func (h sampleIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } -func (h sampleIteratorHeap) Peek() SampleIterator { return h[0] } +func (h sampleIteratorHeap) Len() int { return len(h.its) } +func (h sampleIteratorHeap) Swap(i, j int) { h.its[i], h.its[j] = h.its[j], h.its[i] } +func (h sampleIteratorHeap) Peek() SampleIterator { return h.its[0] } func (h *sampleIteratorHeap) Push(x interface{}) { - *h = append(*h, x.(SampleIterator)) + h.its = append(h.its, x.(SampleIterator)) } func (h *sampleIteratorHeap) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] + n := len(h.its) + x := h.its[n-1] + h.its = h.its[0 : n-1] return x } func (h sampleIteratorHeap) Less(i, j int) bool { - s1, s2 := h[i].Sample(), h[j].Sample() - switch { - case s1.Timestamp < s2.Timestamp: - return true - case s1.Timestamp > s2.Timestamp: - return false - default: - return h[i].Labels() < h[j].Labels() + s1, s2 := h.its[i].Sample(), h.its[j].Sample() + if s1.Timestamp == s2.Timestamp { + if h.byAlphabetical { + return h.its[i].Labels() < h.its[j].Labels() + } + return h.its[i].StreamHash() < h.its[j].StreamHash() } + return s1.Timestamp < s2.Timestamp } // mergeSampleIterator iterates over a heap of iterators by merging samples. @@ -155,7 +164,9 @@ type mergeSampleIterator struct { // This means using this iterator with a single iterator will result in the same result as the input iterator. // If you don't need to deduplicate sample, use `NewSortSampleIterator` instead. func NewMergeSampleIterator(ctx context.Context, is []SampleIterator) SampleIterator { - h := sampleIteratorHeap(make([]SampleIterator, 0, len(is))) + h := sampleIteratorHeap{ + its: make([]SampleIterator, 0, len(is)), + } return &mergeSampleIterator{ stats: stats.FromContext(ctx), is: is, @@ -216,6 +227,7 @@ func (i *mergeSampleIterator) Next() bool { if i.heap.Len() == 1 { i.curr.Sample = i.heap.Peek().Sample() i.curr.labels = i.heap.Peek().Labels() + i.curr.streamHash = i.heap.Peek().StreamHash() if !i.heap.Peek().Next() { i.heap.Pop() } @@ -229,7 +241,7 @@ func (i *mergeSampleIterator) Next() bool { for i.heap.Len() > 0 { next := i.heap.Peek() sample := next.Sample() - if len(i.tuples) > 0 && (i.tuples[0].Labels() != next.Labels() || i.tuples[0].Timestamp != sample.Timestamp) { + if len(i.tuples) > 0 && (i.tuples[0].StreamHash() != next.StreamHash() || i.tuples[0].Timestamp != sample.Timestamp) { break } @@ -242,6 +254,7 @@ func (i *mergeSampleIterator) Next() bool { i.curr.Sample = i.tuples[0].Sample i.curr.labels = i.tuples[0].Labels() + i.curr.streamHash = i.tuples[0].StreamHash() t := i.tuples[0] if len(i.tuples) == 1 { i.requeue(i.tuples[0].SampleIterator, false) @@ -272,6 +285,10 @@ func (i *mergeSampleIterator) Labels() string { return i.curr.labels } +func (i *mergeSampleIterator) StreamHash() uint64 { + return i.curr.Hash +} + func (i *mergeSampleIterator) Error() error { switch len(i.errs) { case 0: @@ -306,6 +323,7 @@ type sortSampleIterator struct { // NewSortSampleIterator returns a new SampleIterator that sorts samples by ascending timestamp the input iterators. // The iterator only order sample across given `is` iterators, it does not sort samples within individual iterator. // This means using this iterator with a single iterator will result in the same result as the input iterator. +// When timestamp is equal, the iterator sorts samples by their label alphabetically. func NewSortSampleIterator(is []SampleIterator) SampleIterator { if len(is) == 0 { return NoopIterator @@ -313,7 +331,10 @@ func NewSortSampleIterator(is []SampleIterator) SampleIterator { if len(is) == 1 { return is[0] } - h := sampleIteratorHeap(make([]SampleIterator, 0, len(is))) + h := sampleIteratorHeap{ + its: make([]SampleIterator, 0, len(is)), + byAlphabetical: true, + } return &sortSampleIterator{ is: is, heap: &h, @@ -355,6 +376,7 @@ func (i *sortSampleIterator) Next() bool { next := i.heap.Peek() i.curr.Sample = next.Sample() i.curr.labels = next.Labels() + i.curr.streamHash = next.StreamHash() // if the top iterator is empty, we remove it. if !next.Next() { heap.Pop(i.heap) @@ -378,6 +400,10 @@ func (i *sortSampleIterator) Labels() string { return i.curr.labels } +func (i *sortSampleIterator) StreamHash() uint64 { + return i.curr.streamHash +} + func (i *sortSampleIterator) Error() error { switch len(i.errs) { case 0: @@ -442,6 +468,10 @@ func (i *sampleQueryClientIterator) Labels() string { return i.curr.Labels() } +func (i *sampleQueryClientIterator) StreamHash() uint64 { + return i.curr.StreamHash() +} + func (i *sampleQueryClientIterator) Error() error { return i.err } @@ -456,9 +486,8 @@ func NewSampleQueryResponseIterator(resp *logproto.SampleQueryResponse) SampleIt } type seriesIterator struct { - i int - samples []logproto.Sample - labels string + i int + series logproto.Series } type withCloseSampleIterator struct { @@ -503,15 +532,14 @@ func NewMultiSeriesIterator(series []logproto.Series) SampleIterator { // NewSeriesIterator iterates over sample in a series. func NewSeriesIterator(series logproto.Series) SampleIterator { return &seriesIterator{ - i: -1, - samples: series.Samples, - labels: series.Labels, + i: -1, + series: series, } } func (i *seriesIterator) Next() bool { i.i++ - return i.i < len(i.samples) + return i.i < len(i.series.Samples) } func (i *seriesIterator) Error() error { @@ -519,11 +547,15 @@ func (i *seriesIterator) Error() error { } func (i *seriesIterator) Labels() string { - return i.labels + return i.series.Labels +} + +func (i *seriesIterator) StreamHash() uint64 { + return i.series.StreamHash } func (i *seriesIterator) Sample() logproto.Sample { - return i.samples[i.i] + return i.series.Samples[i.i] } func (i *seriesIterator) Close() error { @@ -531,16 +563,14 @@ func (i *seriesIterator) Close() error { } type nonOverlappingSampleIterator struct { - labels string i int iterators []SampleIterator curr SampleIterator } // NewNonOverlappingSampleIterator gives a chained iterator over a list of iterators. -func NewNonOverlappingSampleIterator(iterators []SampleIterator, labels string) SampleIterator { +func NewNonOverlappingSampleIterator(iterators []SampleIterator) SampleIterator { return &nonOverlappingSampleIterator{ - labels: labels, iterators: iterators, } } @@ -568,18 +598,24 @@ func (i *nonOverlappingSampleIterator) Sample() logproto.Sample { } func (i *nonOverlappingSampleIterator) Labels() string { - if i.labels != "" { - return i.labels + if i.curr == nil { + return "" } - return i.curr.Labels() } +func (i *nonOverlappingSampleIterator) StreamHash() uint64 { + if i.curr == nil { + return 0 + } + return i.curr.StreamHash() +} + func (i *nonOverlappingSampleIterator) Error() error { - if i.curr != nil { - return i.curr.Error() + if i.curr == nil { + return nil } - return nil + return i.curr.Error() } func (i *nonOverlappingSampleIterator) Close() error { @@ -640,11 +676,12 @@ func ReadSampleBatch(i SampleIterator, size uint32) (*logproto.SampleQueryRespon series := map[string]*logproto.Series{} respSize := uint32(0) for ; respSize < size && i.Next(); respSize++ { - labels, sample := i.Labels(), i.Sample() + labels, hash, sample := i.Labels(), i.StreamHash(), i.Sample() s, ok := series[labels] if !ok { s = &logproto.Series{ - Labels: labels, + Labels: labels, + StreamHash: hash, } series[labels] = s } diff --git a/pkg/iter/sample_iterator_test.go b/pkg/iter/sample_iterator_test.go index b1952f6376a93..4aed2e10d79c6 100644 --- a/pkg/iter/sample_iterator_test.go +++ b/pkg/iter/sample_iterator_test.go @@ -91,42 +91,92 @@ func sample(i int) logproto.Sample { } var varSeries = logproto.Series{ - Labels: `{foo="var"}`, + Labels: `{foo="var"}`, + StreamHash: hashLabels(`{foo="var"}`), Samples: []logproto.Sample{ sample(1), sample(2), sample(3), }, } var carSeries = logproto.Series{ - Labels: `{foo="car"}`, + Labels: `{foo="car"}`, + StreamHash: hashLabels(`{foo="car"}`), Samples: []logproto.Sample{ sample(1), sample(2), sample(3), }, } func TestNewMergeSampleIterator(t *testing.T) { - it := NewMergeSampleIterator(context.Background(), - []SampleIterator{ - NewSeriesIterator(varSeries), - NewSeriesIterator(carSeries), - NewSeriesIterator(carSeries), - NewSeriesIterator(varSeries), - NewSeriesIterator(carSeries), - NewSeriesIterator(varSeries), - NewSeriesIterator(carSeries), - }) + t.Run("with labels", func(t *testing.T) { + it := NewMergeSampleIterator(context.Background(), + []SampleIterator{ + NewSeriesIterator(varSeries), + NewSeriesIterator(carSeries), + NewSeriesIterator(carSeries), + NewSeriesIterator(varSeries), + NewSeriesIterator(carSeries), + NewSeriesIterator(varSeries), + NewSeriesIterator(carSeries), + }) - for i := 1; i < 4; i++ { - require.True(t, it.Next(), i) - require.Equal(t, `{foo="car"}`, it.Labels(), i) - require.Equal(t, sample(i), it.Sample(), i) - require.True(t, it.Next(), i) - require.Equal(t, `{foo="var"}`, it.Labels(), i) - require.Equal(t, sample(i), it.Sample(), i) - } - require.False(t, it.Next()) - require.NoError(t, it.Error()) - require.NoError(t, it.Close()) + for i := 1; i < 4; i++ { + require.True(t, it.Next(), i) + require.Equal(t, `{foo="car"}`, it.Labels(), i) + require.Equal(t, sample(i), it.Sample(), i) + require.True(t, it.Next(), i) + require.Equal(t, `{foo="var"}`, it.Labels(), i) + require.Equal(t, sample(i), it.Sample(), i) + } + require.False(t, it.Next()) + require.NoError(t, it.Error()) + require.NoError(t, it.Close()) + }) + t.Run("no labels", func(t *testing.T) { + it := NewMergeSampleIterator(context.Background(), + []SampleIterator{ + NewSeriesIterator(logproto.Series{ + Labels: ``, + StreamHash: carSeries.StreamHash, + Samples: carSeries.Samples, + }), + NewSeriesIterator(logproto.Series{ + Labels: ``, + StreamHash: varSeries.StreamHash, + Samples: varSeries.Samples, + }), NewSeriesIterator(logproto.Series{ + Labels: ``, + StreamHash: carSeries.StreamHash, + Samples: carSeries.Samples, + }), + NewSeriesIterator(logproto.Series{ + Labels: ``, + StreamHash: varSeries.StreamHash, + Samples: varSeries.Samples, + }), + NewSeriesIterator(logproto.Series{ + Labels: ``, + StreamHash: carSeries.StreamHash, + Samples: carSeries.Samples, + }), + NewSeriesIterator(logproto.Series{ + Labels: ``, + StreamHash: varSeries.StreamHash, + Samples: varSeries.Samples, + }), + }) + + for i := 1; i < 4; i++ { + require.True(t, it.Next(), i) + require.Equal(t, ``, it.Labels(), i) + require.Equal(t, sample(i), it.Sample(), i) + require.True(t, it.Next(), i) + require.Equal(t, ``, it.Labels(), i) + require.Equal(t, sample(i), it.Sample(), i) + } + require.False(t, it.Next()) + require.NoError(t, it.Error()) + require.NoError(t, it.Close()) + }) } type fakeSampleClient struct { @@ -176,7 +226,7 @@ func TestNewNonOverlappingSampleIterator(t *testing.T) { Labels: varSeries.Labels, Samples: []logproto.Sample{sample(4), sample(5)}, }), - }, varSeries.Labels) + }) for i := 1; i < 6; i++ { require.True(t, it.Next(), i) @@ -190,7 +240,7 @@ func TestNewNonOverlappingSampleIterator(t *testing.T) { func TestReadSampleBatch(t *testing.T) { res, size, err := ReadSampleBatch(NewSeriesIterator(carSeries), 1) - require.Equal(t, &logproto.SampleQueryResponse{Series: []logproto.Series{{Labels: carSeries.Labels, Samples: []logproto.Sample{sample(1)}}}}, res) + require.Equal(t, &logproto.SampleQueryResponse{Series: []logproto.Series{{Labels: carSeries.Labels, StreamHash: carSeries.StreamHash, Samples: []logproto.Sample{sample(1)}}}}, res) require.Equal(t, uint32(1), size) require.NoError(t, err) @@ -207,6 +257,7 @@ type CloseTestingSmplIterator struct { func (i *CloseTestingSmplIterator) Next() bool { return true } func (i *CloseTestingSmplIterator) Sample() logproto.Sample { return i.s } +func (i *CloseTestingSmplIterator) StreamHash() uint64 { return 0 } func (i *CloseTestingSmplIterator) Labels() string { return "" } func (i *CloseTestingSmplIterator) Error() error { return nil } func (i *CloseTestingSmplIterator) Close() error { @@ -216,7 +267,7 @@ func (i *CloseTestingSmplIterator) Close() error { func TestNonOverlappingSampleClose(t *testing.T) { a, b := &CloseTestingSmplIterator{}, &CloseTestingSmplIterator{} - itr := NewNonOverlappingSampleIterator([]SampleIterator{a, b}, "") + itr := NewNonOverlappingSampleIterator([]SampleIterator{a, b}) // Ensure both itr.cur and itr.iterators are non nil itr.Next() diff --git a/pkg/logproto/logproto.pb.go b/pkg/logproto/logproto.pb.go index 169b4da745dc3..fed87644185b7 100644 --- a/pkg/logproto/logproto.pb.go +++ b/pkg/logproto/logproto.pb.go @@ -481,6 +481,8 @@ func (m *LabelResponse) GetValues() []string { type StreamAdapter struct { Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` Entries []EntryAdapter `protobuf:"bytes,2,rep,name=entries,proto3" json:"entries"` + // hash contains the original hash of the stream. + Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"-"` } func (m *StreamAdapter) Reset() { *m = StreamAdapter{} } @@ -529,6 +531,13 @@ func (m *StreamAdapter) GetEntries() []EntryAdapter { return nil } +func (m *StreamAdapter) GetHash() uint64 { + if m != nil { + return m.Hash + } + return 0 +} + type EntryAdapter struct { Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"` Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"` @@ -691,8 +700,9 @@ func (m *LegacySample) GetTimestampMs() int64 { } type Series struct { - Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` - Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` + Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` + Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` + StreamHash uint64 `protobuf:"varint,3,opt,name=streamHash,proto3" json:"streamHash"` } func (m *Series) Reset() { *m = Series{} } @@ -741,6 +751,13 @@ func (m *Series) GetSamples() []Sample { return nil } +func (m *Series) GetStreamHash() uint64 { + if m != nil { + return m.StreamHash + } + return 0 +} + type TailRequest struct { Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` DelayFor uint32 `protobuf:"varint,3,opt,name=delayFor,proto3" json:"delayFor,omitempty"` @@ -1527,98 +1544,100 @@ func init() { func init() { proto.RegisterFile("pkg/logproto/logproto.proto", fileDescriptor_c28a5f14f1f4c79a) } var fileDescriptor_c28a5f14f1f4c79a = []byte{ - // 1446 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x57, 0x49, 0x8f, 0x13, 0xc7, - 0x17, 0x77, 0x79, 0xe9, 0xb1, 0x9f, 0x97, 0xb1, 0x6a, 0x86, 0x19, 0xff, 0x0d, 0xb4, 0xfd, 0x6f, - 0x21, 0xb0, 0x02, 0xb1, 0xc3, 0x64, 0x63, 0xc9, 0xa2, 0x31, 0x13, 0x60, 0x08, 0x09, 0xd0, 0x20, - 0x21, 0x21, 0x45, 0xa8, 0xc7, 0xae, 0xb1, 0x5b, 0xe3, 0x76, 0x9b, 0xae, 0x36, 0xd2, 0x48, 0x91, - 0x92, 0x0f, 0x90, 0x48, 0xdc, 0x72, 0xc8, 0x35, 0x87, 0x28, 0x87, 0x7c, 0x0e, 0x72, 0x43, 0x39, - 0xa1, 0x1c, 0x9c, 0x60, 0x2e, 0xd1, 0x28, 0x07, 0x3e, 0x42, 0x54, 0x5b, 0xbb, 0x6c, 0x3c, 0x09, - 0xe6, 0x92, 0x4b, 0xbb, 0xde, 0xab, 0xb7, 0xbf, 0x5f, 0xbd, 0x2a, 0xc3, 0xd1, 0xc1, 0x5e, 0xa7, - 0xd1, 0xf3, 0x3b, 0x83, 0xc0, 0x0f, 0xfd, 0x68, 0x51, 0xe7, 0x5f, 0x9c, 0x56, 0x74, 0xb9, 0xd2, - 0xf1, 0xfd, 0x4e, 0x8f, 0x34, 0x38, 0xb5, 0x33, 0xdc, 0x6d, 0x84, 0xae, 0x47, 0x68, 0xe8, 0x78, - 0x03, 0x21, 0x5a, 0x7e, 0xb3, 0xe3, 0x86, 0xdd, 0xe1, 0x4e, 0xbd, 0xe5, 0x7b, 0x8d, 0x8e, 0xdf, - 0xf1, 0x27, 0x92, 0x8c, 0x12, 0xd6, 0xd9, 0x4a, 0x8a, 0x57, 0xa5, 0xdb, 0x07, 0x3d, 0xcf, 0x6f, - 0x93, 0x5e, 0x83, 0x86, 0x4e, 0x48, 0xc5, 0x57, 0x48, 0x58, 0x77, 0x21, 0x7b, 0x73, 0x48, 0xbb, - 0x36, 0x79, 0x30, 0x24, 0x34, 0xc4, 0x57, 0x61, 0x89, 0x86, 0x01, 0x71, 0x3c, 0x5a, 0x42, 0xd5, - 0x44, 0x2d, 0xbb, 0xb1, 0x5e, 0x8f, 0x82, 0xbd, 0xcd, 0x37, 0x36, 0xdb, 0xce, 0x20, 0x24, 0x41, - 0xf3, 0xc8, 0x6f, 0xa3, 0x8a, 0x21, 0x58, 0x07, 0xa3, 0x8a, 0xd2, 0xb2, 0xd5, 0xc2, 0x2a, 0x40, - 0x4e, 0x18, 0xa6, 0x03, 0xbf, 0x4f, 0x89, 0xf5, 0x7d, 0x1c, 0x72, 0xb7, 0x86, 0x24, 0xd8, 0x57, - 0xae, 0xca, 0x90, 0xa6, 0xa4, 0x47, 0x5a, 0xa1, 0x1f, 0x94, 0x50, 0x15, 0xd5, 0x32, 0x76, 0x44, - 0xe3, 0x55, 0x48, 0xf5, 0x5c, 0xcf, 0x0d, 0x4b, 0xf1, 0x2a, 0xaa, 0xe5, 0x6d, 0x41, 0xe0, 0x0b, - 0x90, 0xa2, 0xa1, 0x13, 0x84, 0xa5, 0x44, 0x15, 0xd5, 0xb2, 0x1b, 0xe5, 0xba, 0xa8, 0x56, 0x5d, - 0xd5, 0xa0, 0x7e, 0x47, 0x55, 0xab, 0x99, 0x7e, 0x3c, 0xaa, 0xc4, 0x1e, 0xfd, 0x5e, 0x41, 0xb6, - 0x50, 0xc1, 0xef, 0x41, 0x82, 0xf4, 0xdb, 0xa5, 0xe4, 0x02, 0x9a, 0x4c, 0x01, 0x9f, 0x85, 0x4c, - 0xdb, 0x0d, 0x48, 0x2b, 0x74, 0xfd, 0x7e, 0x29, 0x55, 0x45, 0xb5, 0xc2, 0xc6, 0xca, 0xa4, 0x24, - 0x5b, 0x6a, 0xcb, 0x9e, 0x48, 0xe1, 0x33, 0x60, 0xd0, 0xae, 0x13, 0xb4, 0x69, 0x69, 0xa9, 0x9a, - 0xa8, 0x65, 0x9a, 0xab, 0x07, 0xa3, 0x4a, 0x51, 0x70, 0xce, 0xf8, 0x9e, 0x1b, 0x12, 0x6f, 0x10, - 0xee, 0xdb, 0x52, 0xe6, 0x5a, 0x32, 0x6d, 0x14, 0x97, 0xac, 0x5f, 0x11, 0xe0, 0xdb, 0x8e, 0x37, - 0xe8, 0x91, 0x57, 0xae, 0x51, 0x54, 0x8d, 0xf8, 0x6b, 0x57, 0x23, 0xb1, 0x68, 0x35, 0x26, 0xa9, - 0x25, 0xff, 0x3d, 0x35, 0xeb, 0x2b, 0xc8, 0xcb, 0x6c, 0x04, 0x06, 0xf0, 0xe6, 0x2b, 0xa3, 0xab, - 0xf0, 0x78, 0x54, 0x41, 0x13, 0x84, 0x45, 0xb0, 0xc2, 0xa7, 0x79, 0xd6, 0x21, 0x95, 0x59, 0x2f, - 0xd7, 0x05, 0x98, 0xb7, 0xfb, 0x1d, 0x42, 0x99, 0x62, 0x92, 0x05, 0x6c, 0x0b, 0x19, 0xeb, 0x4b, - 0x58, 0x99, 0x2a, 0xaa, 0x0c, 0xe3, 0x1c, 0x18, 0x94, 0x04, 0x2e, 0x51, 0x51, 0x14, 0xb5, 0x28, - 0x38, 0x5f, 0x73, 0xcf, 0x69, 0x5b, 0xca, 0x2f, 0xe6, 0xfd, 0x67, 0x04, 0xb9, 0xeb, 0xce, 0x0e, - 0xe9, 0xa9, 0x6e, 0x62, 0x48, 0xf6, 0x1d, 0x8f, 0xc8, 0x4e, 0xf2, 0x35, 0x5e, 0x03, 0xe3, 0xa1, - 0xd3, 0x1b, 0x12, 0x61, 0x32, 0x6d, 0x4b, 0x6a, 0x51, 0xac, 0xa3, 0xd7, 0xc6, 0x3a, 0x8a, 0xba, - 0x6b, 0x9d, 0x82, 0xbc, 0x8c, 0x57, 0x16, 0x6a, 0x12, 0x1c, 0x2b, 0x54, 0x46, 0x05, 0x67, 0x3d, - 0x84, 0xfc, 0x54, 0xbb, 0xb0, 0x05, 0x46, 0x8f, 0x69, 0x52, 0x91, 0x5b, 0x13, 0x0e, 0x46, 0x15, - 0xc9, 0xb1, 0xe5, 0x2f, 0x6b, 0x3e, 0xe9, 0x87, 0xbc, 0xec, 0x71, 0x5e, 0xf6, 0xb5, 0x49, 0xd9, - 0x3f, 0xe9, 0x87, 0xc1, 0xbe, 0xea, 0xfd, 0x32, 0x2b, 0x22, 0x9b, 0x29, 0x52, 0xdc, 0x56, 0x0b, - 0xeb, 0x21, 0xe4, 0x74, 0x49, 0x7c, 0x15, 0x32, 0xd1, 0x80, 0xe4, 0x9e, 0xff, 0x39, 0xdd, 0x82, - 0x34, 0x1c, 0x0f, 0x29, 0x4f, 0x7a, 0xa2, 0x8c, 0x8f, 0x41, 0xb2, 0xe7, 0xf6, 0x09, 0x6f, 0x42, - 0xa6, 0x99, 0x3e, 0x18, 0x55, 0x38, 0x6d, 0xf3, 0xaf, 0xe5, 0x81, 0x21, 0x70, 0x84, 0x4f, 0xcc, - 0x7a, 0x4c, 0x34, 0x0d, 0x61, 0x51, 0xb7, 0x56, 0x81, 0x14, 0xaf, 0x14, 0x37, 0x87, 0x9a, 0x99, - 0x83, 0x51, 0x45, 0x30, 0x6c, 0xf1, 0xc3, 0xdc, 0x75, 0x1d, 0xda, 0xe5, 0xcd, 0x4d, 0x0a, 0x77, - 0x8c, 0xb6, 0xf9, 0xd7, 0xba, 0x02, 0xb9, 0xeb, 0xa4, 0xe3, 0xb4, 0xf6, 0xa5, 0xd3, 0x55, 0x65, - 0x8e, 0x39, 0x44, 0xca, 0xc6, 0xff, 0x21, 0x17, 0x79, 0xbc, 0xef, 0x09, 0xfc, 0x24, 0xec, 0x6c, - 0xc4, 0xfb, 0x8c, 0x5a, 0x2e, 0x48, 0x00, 0xbf, 0x52, 0x83, 0x2e, 0xc2, 0x12, 0xe5, 0x0e, 0x55, - 0x83, 0xf4, 0x73, 0xc1, 0x37, 0x26, 0xad, 0x91, 0x82, 0xb6, 0x5a, 0x58, 0xdf, 0x21, 0xc8, 0xde, - 0x71, 0xdc, 0x08, 0xeb, 0xab, 0x90, 0x7a, 0xc0, 0x0e, 0x9d, 0x04, 0xbb, 0x20, 0xd8, 0x3c, 0x6b, - 0x93, 0x9e, 0xb3, 0x7f, 0xd9, 0x0f, 0x78, 0xee, 0x79, 0x3b, 0xa2, 0x27, 0x33, 0x3f, 0x39, 0x77, - 0xe6, 0xa7, 0x16, 0x9e, 0x72, 0xd7, 0x92, 0xe9, 0x78, 0x31, 0x61, 0x7d, 0x83, 0x20, 0x27, 0x22, - 0x93, 0xa8, 0xbe, 0x08, 0x86, 0x98, 0x26, 0x12, 0x32, 0x87, 0x0e, 0x21, 0xd0, 0x06, 0x90, 0x54, - 0xc1, 0x1f, 0x43, 0xa1, 0x1d, 0xf8, 0x83, 0x01, 0x69, 0xdf, 0x96, 0x93, 0x2c, 0x3e, 0x3b, 0xc9, - 0xb6, 0xf4, 0x7d, 0x7b, 0x46, 0xdc, 0xfa, 0x05, 0x41, 0x5e, 0x4e, 0x15, 0x59, 0xaa, 0x28, 0x45, - 0xf4, 0xda, 0x83, 0x3c, 0xbe, 0xe8, 0x20, 0x5f, 0x03, 0xa3, 0x13, 0xf8, 0xc3, 0x01, 0x2d, 0x25, - 0xc4, 0xc9, 0x16, 0xd4, 0x82, 0x03, 0xfe, 0x1a, 0x14, 0x54, 0x2a, 0x87, 0x8c, 0xd6, 0xf2, 0xec, - 0x68, 0xdd, 0x6e, 0x93, 0x7e, 0xe8, 0xee, 0xba, 0xd1, 0xb0, 0x94, 0xf2, 0xd6, 0xb7, 0x08, 0x8a, - 0xb3, 0x22, 0xf8, 0x23, 0x0d, 0xb6, 0xcc, 0xdc, 0xc9, 0xc3, 0xcd, 0xd5, 0xf9, 0xe8, 0xa2, 0x7c, - 0x3e, 0x28, 0x48, 0x97, 0xcf, 0x43, 0x56, 0x63, 0xe3, 0x22, 0x24, 0xf6, 0x88, 0x82, 0x24, 0x5b, - 0x4e, 0x8e, 0x56, 0x5c, 0xc0, 0x94, 0x13, 0x17, 0xe2, 0xe7, 0x10, 0x03, 0x74, 0x7e, 0xaa, 0x93, - 0xf8, 0x1c, 0x24, 0x77, 0x03, 0xdf, 0x5b, 0xa8, 0x4d, 0x5c, 0x03, 0xbf, 0x03, 0xf1, 0xd0, 0x5f, - 0xa8, 0x49, 0xf1, 0xd0, 0x67, 0x3d, 0x92, 0xc9, 0x27, 0x78, 0x70, 0x92, 0xb2, 0x7e, 0x42, 0xb0, - 0xcc, 0x74, 0x44, 0x05, 0x2e, 0x75, 0x87, 0xfd, 0x3d, 0x5c, 0x83, 0x22, 0xf3, 0x74, 0xdf, 0x95, - 0x37, 0xd1, 0x7d, 0xb7, 0x2d, 0xd3, 0x2c, 0x30, 0xbe, 0xba, 0xa0, 0xb6, 0xdb, 0x78, 0x1d, 0x96, - 0x86, 0x54, 0x08, 0x88, 0x9c, 0x0d, 0x46, 0x6e, 0xb7, 0xf1, 0x69, 0xcd, 0x1d, 0xab, 0xb5, 0xf6, - 0xcc, 0xe1, 0x35, 0xbc, 0xe9, 0xb8, 0x41, 0x34, 0x2b, 0x4e, 0x81, 0xd1, 0x62, 0x8e, 0x05, 0x4e, - 0xd8, 0x4d, 0x18, 0x09, 0xf3, 0x80, 0x6c, 0xb9, 0x6d, 0xbd, 0x0b, 0x99, 0x48, 0x7b, 0xee, 0x05, - 0x38, 0xb7, 0x03, 0xd6, 0x45, 0x58, 0x16, 0x23, 0x70, 0xbe, 0x72, 0x6e, 0x9e, 0x72, 0x4e, 0x29, - 0x1f, 0x85, 0x94, 0xa8, 0x0a, 0x86, 0x64, 0xdb, 0x09, 0x1d, 0xa5, 0xc2, 0xd6, 0x56, 0x09, 0xd6, - 0xee, 0x04, 0x4e, 0x9f, 0xee, 0x92, 0x80, 0x0b, 0x45, 0xd8, 0xb5, 0x8e, 0xc0, 0x0a, 0x9b, 0x13, - 0x24, 0xa0, 0x97, 0xfc, 0x61, 0x3f, 0x94, 0xc7, 0xd3, 0x3a, 0x03, 0xab, 0xd3, 0x6c, 0x09, 0xf5, - 0x55, 0x48, 0xb5, 0x18, 0x83, 0x5b, 0xcf, 0xdb, 0x82, 0xb0, 0x7e, 0x40, 0x80, 0xaf, 0x90, 0x90, - 0x9b, 0xde, 0xde, 0xa2, 0xda, 0x43, 0xce, 0x73, 0xc2, 0x56, 0x97, 0x04, 0x54, 0x3d, 0xe4, 0x14, - 0xfd, 0x5f, 0x3c, 0xe4, 0xac, 0xb3, 0xb0, 0x32, 0x15, 0xa5, 0xcc, 0xa9, 0x0c, 0xe9, 0x96, 0xe4, - 0xc9, 0x2b, 0x3f, 0xa2, 0xdf, 0x38, 0x09, 0x99, 0xe8, 0xb9, 0x8b, 0xb3, 0xb0, 0x74, 0xf9, 0x86, - 0x7d, 0x77, 0xd3, 0xde, 0x2a, 0xc6, 0x70, 0x0e, 0xd2, 0xcd, 0xcd, 0x4b, 0x9f, 0x72, 0x0a, 0x6d, - 0x6c, 0x82, 0xc1, 0x1e, 0xfe, 0x24, 0xc0, 0xef, 0x43, 0x92, 0xad, 0xf0, 0x91, 0x09, 0x38, 0xb4, - 0xff, 0x1a, 0xe5, 0xb5, 0x59, 0xb6, 0xec, 0x43, 0x6c, 0xe3, 0xaf, 0x04, 0x2c, 0xb1, 0x27, 0x1b, - 0x1b, 0x01, 0x1f, 0x40, 0x8a, 0xbf, 0xde, 0xb0, 0x26, 0xae, 0xbf, 0x91, 0xcb, 0xeb, 0x2f, 0xf1, - 0x95, 0x9d, 0xb7, 0x10, 0xfe, 0x1c, 0xb2, 0x9c, 0x29, 0x6f, 0xd2, 0x63, 0xb3, 0x37, 0xda, 0x94, - 0xa5, 0xe3, 0x87, 0xec, 0x6a, 0xf6, 0x2e, 0x40, 0x8a, 0x23, 0x52, 0x8f, 0x46, 0x7f, 0xe3, 0xe9, - 0xd1, 0x4c, 0xbd, 0xa5, 0xac, 0x18, 0x3e, 0x0f, 0x49, 0x06, 0x24, 0xbd, 0x1c, 0xda, 0x8d, 0xa9, - 0x97, 0x43, 0xbf, 0xae, 0xb8, 0xdb, 0x0f, 0xa3, 0x8b, 0x7c, 0x7d, 0x76, 0x02, 0x2a, 0xf5, 0xd2, - 0xcb, 0x1b, 0x91, 0xe7, 0x1b, 0xe2, 0x06, 0x54, 0x10, 0xc6, 0xc7, 0xa7, 0x5d, 0xcd, 0x20, 0xbe, - 0x6c, 0x1e, 0xb6, 0x1d, 0x19, 0xbc, 0x0e, 0x59, 0x0d, 0x3e, 0x7a, 0x59, 0x5f, 0xc6, 0xbe, 0x5e, - 0xd6, 0x39, 0x98, 0xb3, 0x62, 0x1b, 0x5f, 0x40, 0x5a, 0x0d, 0x28, 0x7c, 0x0b, 0x0a, 0xd3, 0xc7, - 0x13, 0xff, 0x4f, 0x8b, 0x66, 0x7a, 0xea, 0x95, 0xab, 0xda, 0xd6, 0xfc, 0x33, 0x1d, 0xab, 0xa1, - 0xe6, 0xbd, 0x27, 0xcf, 0xcc, 0xd8, 0xd3, 0x67, 0x66, 0xec, 0xc5, 0x33, 0x13, 0x7d, 0x3d, 0x36, - 0xd1, 0x8f, 0x63, 0x13, 0x3d, 0x1e, 0x9b, 0xe8, 0xc9, 0xd8, 0x44, 0x7f, 0x8c, 0x4d, 0xf4, 0xe7, - 0xd8, 0x8c, 0xbd, 0x18, 0x9b, 0xe8, 0xd1, 0x73, 0x33, 0xf6, 0xe4, 0xb9, 0x19, 0x7b, 0xfa, 0xdc, - 0x8c, 0xdd, 0x3b, 0xa1, 0xff, 0xd3, 0x0e, 0x9c, 0x5d, 0xa7, 0xef, 0x34, 0x7a, 0xfe, 0x9e, 0xdb, - 0xd0, 0xff, 0xc9, 0xef, 0x18, 0xfc, 0xe7, 0xed, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0xe8, 0x71, - 0x40, 0x74, 0xe0, 0x0f, 0x00, 0x00, + // 1482 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x58, 0x4b, 0x6f, 0x13, 0xd7, + 0x17, 0xf7, 0xb5, 0xc7, 0x13, 0xfb, 0xd8, 0x71, 0xac, 0x9b, 0x90, 0x18, 0x03, 0x63, 0xff, 0x47, + 0x08, 0xac, 0x3f, 0xe0, 0x94, 0xf4, 0xc5, 0xa3, 0x0f, 0xc5, 0xa4, 0x40, 0x28, 0x2d, 0x30, 0x20, + 0x21, 0x21, 0x55, 0x68, 0x62, 0xdf, 0x38, 0xa3, 0x78, 0x3c, 0x66, 0xee, 0x18, 0x29, 0x52, 0xa5, + 0xf6, 0x03, 0xb4, 0x12, 0x5d, 0x55, 0x55, 0xb7, 0x5d, 0x54, 0x5d, 0xf4, 0x73, 0xd0, 0x1d, 0xea, + 0x0a, 0x75, 0xe1, 0x96, 0xb0, 0xa9, 0xac, 0x2e, 0xf8, 0x08, 0xd5, 0x7d, 0x8d, 0xaf, 0x8d, 0xd3, + 0x62, 0x36, 0xdd, 0xd8, 0xf7, 0x9c, 0x7b, 0x5e, 0xf7, 0x77, 0xce, 0x3d, 0xe7, 0xda, 0x70, 0xa4, + 0xb7, 0xdb, 0x5e, 0xed, 0x04, 0xed, 0x5e, 0x18, 0x44, 0x41, 0xbc, 0xa8, 0xf3, 0x4f, 0x9c, 0x51, + 0x74, 0xb9, 0xd2, 0x0e, 0x82, 0x76, 0x87, 0xac, 0x72, 0x6a, 0xab, 0xbf, 0xbd, 0x1a, 0x79, 0x3e, + 0xa1, 0x91, 0xeb, 0xf7, 0x84, 0x68, 0xf9, 0x4c, 0xdb, 0x8b, 0x76, 0xfa, 0x5b, 0xf5, 0x66, 0xe0, + 0xaf, 0xb6, 0x83, 0x76, 0x30, 0x92, 0x64, 0x94, 0xb0, 0xce, 0x56, 0x52, 0xbc, 0x2a, 0xdd, 0x3e, + 0xe8, 0xf8, 0x41, 0x8b, 0x74, 0x56, 0x69, 0xe4, 0x46, 0x54, 0x7c, 0x0a, 0x09, 0xfb, 0x2e, 0xe4, + 0x6e, 0xf6, 0xe9, 0x8e, 0x43, 0x1e, 0xf4, 0x09, 0x8d, 0xf0, 0x55, 0x98, 0xa3, 0x51, 0x48, 0x5c, + 0x9f, 0x96, 0x50, 0x35, 0x55, 0xcb, 0xad, 0xad, 0xd4, 0xe3, 0x60, 0x6f, 0xf3, 0x8d, 0xf5, 0x96, + 0xdb, 0x8b, 0x48, 0xd8, 0x38, 0xf4, 0xdb, 0xa0, 0x62, 0x0a, 0xd6, 0x70, 0x50, 0x51, 0x5a, 0x8e, + 0x5a, 0xd8, 0x05, 0xc8, 0x0b, 0xc3, 0xb4, 0x17, 0x74, 0x29, 0xb1, 0xbf, 0x4f, 0x42, 0xfe, 0x56, + 0x9f, 0x84, 0x7b, 0xca, 0x55, 0x19, 0x32, 0x94, 0x74, 0x48, 0x33, 0x0a, 0xc2, 0x12, 0xaa, 0xa2, + 0x5a, 0xd6, 0x89, 0x69, 0xbc, 0x04, 0xe9, 0x8e, 0xe7, 0x7b, 0x51, 0x29, 0x59, 0x45, 0xb5, 0x79, + 0x47, 0x10, 0xf8, 0x02, 0xa4, 0x69, 0xe4, 0x86, 0x51, 0x29, 0x55, 0x45, 0xb5, 0xdc, 0x5a, 0xb9, + 0x2e, 0xd0, 0xaa, 0x2b, 0x0c, 0xea, 0x77, 0x14, 0x5a, 0x8d, 0xcc, 0xe3, 0x41, 0x25, 0xf1, 0xe8, + 0xf7, 0x0a, 0x72, 0x84, 0x0a, 0x7e, 0x07, 0x52, 0xa4, 0xdb, 0x2a, 0x19, 0x33, 0x68, 0x32, 0x05, + 0x7c, 0x16, 0xb2, 0x2d, 0x2f, 0x24, 0xcd, 0xc8, 0x0b, 0xba, 0xa5, 0x74, 0x15, 0xd5, 0x0a, 0x6b, + 0x8b, 0x23, 0x48, 0x36, 0xd4, 0x96, 0x33, 0x92, 0xc2, 0xa7, 0xc1, 0xa4, 0x3b, 0x6e, 0xd8, 0xa2, + 0xa5, 0xb9, 0x6a, 0xaa, 0x96, 0x6d, 0x2c, 0x0d, 0x07, 0x95, 0xa2, 0xe0, 0x9c, 0x0e, 0x7c, 0x2f, + 0x22, 0x7e, 0x2f, 0xda, 0x73, 0xa4, 0xcc, 0x35, 0x23, 0x63, 0x16, 0xe7, 0xec, 0x5f, 0x11, 0xe0, + 0xdb, 0xae, 0xdf, 0xeb, 0x90, 0x57, 0xc6, 0x28, 0x46, 0x23, 0xf9, 0xda, 0x68, 0xa4, 0x66, 0x45, + 0x63, 0x74, 0x34, 0xe3, 0xdf, 0x8f, 0x66, 0x7f, 0x01, 0xf3, 0xf2, 0x34, 0xa2, 0x06, 0xf0, 0xfa, + 0x2b, 0x57, 0x57, 0xe1, 0xf1, 0xa0, 0x82, 0x46, 0x15, 0x16, 0x97, 0x15, 0x3e, 0xc5, 0x4f, 0x1d, + 0x51, 0x79, 0xea, 0x85, 0xba, 0x28, 0xe6, 0xcd, 0x6e, 0x9b, 0x50, 0xa6, 0x68, 0xb0, 0x80, 0x1d, + 0x21, 0x63, 0x7f, 0x0e, 0x8b, 0x63, 0xa0, 0xca, 0x30, 0xce, 0x81, 0x49, 0x49, 0xe8, 0x11, 0x15, + 0x45, 0x51, 0x8b, 0x82, 0xf3, 0x35, 0xf7, 0x9c, 0x76, 0xa4, 0xfc, 0x6c, 0xde, 0x7f, 0x46, 0x90, + 0xbf, 0xee, 0x6e, 0x91, 0x8e, 0xca, 0x26, 0x06, 0xa3, 0xeb, 0xfa, 0x44, 0x66, 0x92, 0xaf, 0xf1, + 0x32, 0x98, 0x0f, 0xdd, 0x4e, 0x9f, 0x08, 0x93, 0x19, 0x47, 0x52, 0xb3, 0xd6, 0x3a, 0x7a, 0xed, + 0x5a, 0x47, 0x71, 0x76, 0xed, 0x93, 0x30, 0x2f, 0xe3, 0x95, 0x40, 0x8d, 0x82, 0x63, 0x40, 0x65, + 0x55, 0x70, 0xf6, 0x37, 0x08, 0xe6, 0xc7, 0xf2, 0x85, 0x6d, 0x30, 0x3b, 0x4c, 0x95, 0x8a, 0xc3, + 0x35, 0x60, 0x38, 0xa8, 0x48, 0x8e, 0x23, 0xbf, 0x59, 0xf6, 0x49, 0x37, 0xe2, 0xb8, 0x27, 0x39, + 0xee, 0xcb, 0x23, 0xdc, 0x3f, 0xea, 0x46, 0xe1, 0x9e, 0x4a, 0xfe, 0x02, 0x43, 0x91, 0x35, 0x15, + 0x29, 0xee, 0xa8, 0x05, 0x3e, 0x0c, 0xc6, 0x8e, 0x4b, 0x77, 0x38, 0x28, 0x46, 0x23, 0x3d, 0x1c, + 0x54, 0xd0, 0x19, 0x87, 0xb3, 0xec, 0x87, 0x90, 0xd7, 0x8d, 0xe0, 0xab, 0x90, 0x8d, 0x9b, 0x27, + 0x0f, 0xea, 0x9f, 0xa1, 0x28, 0x48, 0x9f, 0xc9, 0x88, 0x72, 0x40, 0x46, 0xca, 0xf8, 0x28, 0x18, + 0x1d, 0xaf, 0x4b, 0x78, 0x82, 0xb2, 0x8d, 0xcc, 0x70, 0x50, 0xe1, 0xb4, 0xc3, 0x3f, 0x6d, 0x1f, + 0x4c, 0x51, 0x63, 0xf8, 0xf8, 0xa4, 0xc7, 0x54, 0xc3, 0x14, 0x16, 0x75, 0x6b, 0x15, 0x48, 0x73, + 0x14, 0xb9, 0x39, 0xd4, 0xc8, 0x0e, 0x07, 0x15, 0xc1, 0x70, 0xc4, 0x17, 0x73, 0xa7, 0x9d, 0x91, + 0xbb, 0x63, 0xb4, 0x3c, 0xe6, 0x15, 0xc8, 0x5f, 0x27, 0x6d, 0xb7, 0xb9, 0x27, 0x9d, 0x2e, 0x29, + 0x73, 0xcc, 0x21, 0x52, 0x36, 0xfe, 0x07, 0xf9, 0xd8, 0xe3, 0x7d, 0x5f, 0xd4, 0x56, 0xca, 0xc9, + 0xc5, 0xbc, 0x4f, 0xa8, 0xfd, 0x1d, 0x02, 0x59, 0xdd, 0xaf, 0x94, 0xbc, 0x8b, 0x30, 0x47, 0xb9, + 0x47, 0x95, 0x3c, 0xfd, 0xd2, 0xf0, 0x8d, 0x51, 0xda, 0xa4, 0xa0, 0xa3, 0x16, 0xb8, 0x0e, 0x20, + 0xee, 0xef, 0xd5, 0xd1, 0xc1, 0x0a, 0xc3, 0x41, 0x45, 0xe3, 0x3a, 0xda, 0xda, 0xfe, 0x16, 0x41, + 0xee, 0x8e, 0xeb, 0xc5, 0x17, 0x67, 0x09, 0xd2, 0x0f, 0xd8, 0x0d, 0x96, 0x37, 0x47, 0x10, 0xac, + 0x39, 0xb6, 0x48, 0xc7, 0xdd, 0xbb, 0x1c, 0x84, 0xdc, 0xe6, 0xbc, 0x13, 0xd3, 0xa3, 0x01, 0x62, + 0x4c, 0x1d, 0x20, 0xe9, 0x99, 0x5b, 0xe6, 0x35, 0x23, 0x93, 0x2c, 0xa6, 0xec, 0xaf, 0x10, 0xe4, + 0x45, 0x64, 0xf2, 0x8a, 0x5c, 0x04, 0x53, 0x04, 0x2e, 0x6b, 0xec, 0xc0, 0x8e, 0x06, 0x5a, 0x37, + 0x93, 0x2a, 0xf8, 0x43, 0x28, 0xb4, 0xc2, 0xa0, 0xd7, 0x23, 0xad, 0xdb, 0xb2, 0x2d, 0x26, 0x27, + 0xdb, 0xe2, 0x86, 0xbe, 0xef, 0x4c, 0x88, 0xdb, 0xbf, 0xb0, 0x8b, 0x28, 0x5a, 0x94, 0x84, 0x2a, + 0x3e, 0x22, 0x7a, 0xed, 0xa9, 0x90, 0x9c, 0x75, 0x2a, 0x2c, 0x83, 0xd9, 0x0e, 0x83, 0x7e, 0x8f, + 0x96, 0x52, 0xa2, 0x4d, 0x08, 0x6a, 0xc6, 0x69, 0x71, 0x0d, 0x0a, 0xea, 0x28, 0x07, 0xf4, 0xe9, + 0xf2, 0x64, 0x9f, 0xde, 0x6c, 0x91, 0x6e, 0xe4, 0x6d, 0x7b, 0x71, 0xe7, 0x95, 0xf2, 0xf6, 0xd7, + 0x08, 0x8a, 0x93, 0x22, 0xf8, 0x03, 0xad, 0xcc, 0x99, 0xb9, 0x13, 0x07, 0x9b, 0xab, 0xf3, 0x3e, + 0x48, 0x79, 0x43, 0x51, 0x57, 0xa0, 0x7c, 0x1e, 0x72, 0x1a, 0x1b, 0x17, 0x21, 0xb5, 0x4b, 0x54, + 0x49, 0xb2, 0xe5, 0xe8, 0x2e, 0x26, 0x45, 0x99, 0x72, 0xe2, 0x42, 0xf2, 0x1c, 0x62, 0x05, 0x3d, + 0x3f, 0x96, 0x49, 0x7c, 0x0e, 0x8c, 0xed, 0x30, 0xf0, 0x67, 0x4a, 0x13, 0xd7, 0xc0, 0x6f, 0x41, + 0x32, 0x0a, 0x66, 0x4a, 0x52, 0x32, 0x0a, 0x58, 0x8e, 0xe4, 0xe1, 0x53, 0x3c, 0x38, 0x49, 0xd9, + 0x3f, 0x21, 0x58, 0x60, 0x3a, 0x02, 0x81, 0x4b, 0x3b, 0xfd, 0xee, 0x2e, 0xae, 0x41, 0x91, 0x79, + 0xba, 0xef, 0xc9, 0xb1, 0x76, 0xdf, 0x6b, 0xc9, 0x63, 0x16, 0x18, 0x5f, 0x4d, 0xbb, 0xcd, 0x16, + 0x5e, 0x81, 0xb9, 0x3e, 0x15, 0x02, 0xe2, 0xcc, 0x26, 0x23, 0x37, 0x5b, 0xf8, 0x94, 0xe6, 0x8e, + 0x61, 0xad, 0xbd, 0x99, 0x38, 0x86, 0x37, 0x5d, 0x2f, 0x8c, 0x7b, 0xcb, 0x49, 0x30, 0x9b, 0xcc, + 0xb1, 0xa8, 0x13, 0x36, 0x56, 0x63, 0x61, 0x1e, 0x90, 0x23, 0xb7, 0xed, 0xb7, 0x21, 0x1b, 0x6b, + 0x4f, 0x9d, 0xa6, 0x53, 0x33, 0x60, 0x5f, 0x84, 0x05, 0xd1, 0x33, 0xa7, 0x2b, 0xe7, 0xa7, 0x29, + 0xe7, 0x95, 0xf2, 0x11, 0x48, 0x0b, 0x54, 0x30, 0x18, 0x2d, 0x37, 0x72, 0x95, 0x0a, 0x5b, 0xdb, + 0x25, 0x58, 0xbe, 0x13, 0xba, 0x5d, 0xba, 0x4d, 0x42, 0x2e, 0x14, 0xd7, 0xae, 0x7d, 0x08, 0x16, + 0x59, 0x9f, 0x20, 0x21, 0xbd, 0x14, 0xf4, 0xbb, 0x91, 0xbc, 0x9e, 0xf6, 0x69, 0x58, 0x1a, 0x67, + 0xcb, 0x52, 0x5f, 0x82, 0x74, 0x93, 0x31, 0xb8, 0xf5, 0x79, 0x47, 0x10, 0xf6, 0x0f, 0x08, 0xf0, + 0x15, 0x12, 0x71, 0xd3, 0x9b, 0x1b, 0x54, 0x7b, 0x15, 0xfa, 0x6e, 0xd4, 0xdc, 0x21, 0x21, 0x55, + 0xaf, 0x42, 0x45, 0xff, 0x17, 0xaf, 0x42, 0xfb, 0x2c, 0x2c, 0x8e, 0x45, 0x29, 0xcf, 0x54, 0x86, + 0x4c, 0x53, 0xf2, 0xe4, 0xfb, 0x21, 0xa6, 0xff, 0x7f, 0x02, 0xb2, 0xf1, 0xdb, 0x19, 0xe7, 0x60, + 0xee, 0xf2, 0x0d, 0xe7, 0xee, 0xba, 0xb3, 0x51, 0x4c, 0xe0, 0x3c, 0x64, 0x1a, 0xeb, 0x97, 0x3e, + 0xe6, 0x14, 0x5a, 0x5b, 0x07, 0x93, 0xfd, 0x8a, 0x20, 0x21, 0x7e, 0x17, 0x0c, 0xb6, 0xc2, 0x87, + 0x46, 0xc5, 0xa1, 0xfd, 0x70, 0x29, 0x2f, 0x4f, 0xb2, 0x65, 0x1e, 0x12, 0x6b, 0x7f, 0xa5, 0x60, + 0x8e, 0xbd, 0xff, 0x58, 0x0b, 0x78, 0x0f, 0xd2, 0xfc, 0x29, 0x88, 0x35, 0x71, 0xfd, 0xc1, 0x5d, + 0x5e, 0x79, 0x89, 0xaf, 0xec, 0xbc, 0x81, 0xf0, 0xa7, 0x90, 0xe3, 0x4c, 0x39, 0x7a, 0x8f, 0x4e, + 0x4e, 0xc0, 0x31, 0x4b, 0xc7, 0x0e, 0xd8, 0xd5, 0xec, 0x5d, 0x80, 0x34, 0xaf, 0x48, 0x3d, 0x1a, + 0xfd, 0xc1, 0xa8, 0x47, 0x33, 0xf6, 0x30, 0xb3, 0x13, 0xf8, 0x3c, 0x18, 0xac, 0x90, 0x74, 0x38, + 0xb4, 0x89, 0xa9, 0xc3, 0xa1, 0x8f, 0x2b, 0xee, 0xf6, 0xfd, 0x78, 0xf0, 0xaf, 0x4c, 0x76, 0x40, + 0xa5, 0x5e, 0x7a, 0x79, 0x23, 0xf6, 0x7c, 0x43, 0x4c, 0x40, 0x55, 0xc2, 0xf8, 0xd8, 0xb8, 0xab, + 0x89, 0x8a, 0x2f, 0x5b, 0x07, 0x6d, 0xc7, 0x06, 0xaf, 0x43, 0x4e, 0x2b, 0x1f, 0x1d, 0xd6, 0x97, + 0x6b, 0x5f, 0x87, 0x75, 0x4a, 0xcd, 0xd9, 0x89, 0xb5, 0xcf, 0x20, 0xa3, 0x1a, 0x14, 0xbe, 0x05, + 0x85, 0xf1, 0xeb, 0x89, 0x0f, 0x6b, 0xd1, 0x8c, 0x77, 0xbd, 0x72, 0x55, 0xdb, 0x9a, 0x7e, 0xa7, + 0x13, 0x35, 0xd4, 0xb8, 0xf7, 0xe4, 0x99, 0x95, 0x78, 0xfa, 0xcc, 0x4a, 0xbc, 0x78, 0x66, 0xa1, + 0x2f, 0xf7, 0x2d, 0xf4, 0xe3, 0xbe, 0x85, 0x1e, 0xef, 0x5b, 0xe8, 0xc9, 0xbe, 0x85, 0xfe, 0xd8, + 0xb7, 0xd0, 0x9f, 0xfb, 0x56, 0xe2, 0xc5, 0xbe, 0x85, 0x1e, 0x3d, 0xb7, 0x12, 0x4f, 0x9e, 0x5b, + 0x89, 0xa7, 0xcf, 0xad, 0xc4, 0xbd, 0xe3, 0xfa, 0xcf, 0xf6, 0xd0, 0xdd, 0x76, 0xbb, 0xee, 0x6a, + 0x27, 0xd8, 0xf5, 0x56, 0xf5, 0xbf, 0x05, 0xb6, 0x4c, 0xfe, 0xf5, 0xe6, 0xdf, 0x01, 0x00, 0x00, + 0xff, 0xff, 0xf3, 0xe1, 0x6e, 0x60, 0x2d, 0x10, 0x00, 0x00, } func (x Direction) String() string { @@ -1924,6 +1943,9 @@ func (this *StreamAdapter) Equal(that interface{}) bool { return false } } + if this.Hash != that1.Hash { + return false + } return true } func (this *EntryAdapter) Equal(that interface{}) bool { @@ -2040,6 +2062,9 @@ func (this *Series) Equal(that interface{}) bool { return false } } + if this.StreamHash != that1.StreamHash { + return false + } return true } func (this *TailRequest) Equal(that interface{}) bool { @@ -2584,7 +2609,7 @@ func (this *StreamAdapter) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&logproto.StreamAdapter{") s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") if this.Entries != nil { @@ -2594,6 +2619,7 @@ func (this *StreamAdapter) GoString() string { } s = append(s, "Entries: "+fmt.Sprintf("%#v", vs)+",\n") } + s = append(s, "Hash: "+fmt.Sprintf("%#v", this.Hash)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -2635,7 +2661,7 @@ func (this *Series) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&logproto.Series{") s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") if this.Samples != nil { @@ -2645,6 +2671,7 @@ func (this *Series) GoString() string { } s = append(s, "Samples: "+fmt.Sprintf("%#v", vs)+",\n") } + s = append(s, "StreamHash: "+fmt.Sprintf("%#v", this.StreamHash)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -3787,6 +3814,11 @@ func (m *StreamAdapter) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Hash != 0 { + i = encodeVarintLogproto(dAtA, i, uint64(m.Hash)) + i-- + dAtA[i] = 0x18 + } if len(m.Entries) > 0 { for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- { { @@ -3942,6 +3974,11 @@ func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.StreamHash != 0 { + i = encodeVarintLogproto(dAtA, i, uint64(m.StreamHash)) + i-- + dAtA[i] = 0x18 + } if len(m.Samples) > 0 { for iNdEx := len(m.Samples) - 1; iNdEx >= 0; iNdEx-- { { @@ -4742,6 +4779,9 @@ func (m *StreamAdapter) Size() (n int) { n += 1 + l + sovLogproto(uint64(l)) } } + if m.Hash != 0 { + n += 1 + sovLogproto(uint64(m.Hash)) + } return n } @@ -4809,6 +4849,9 @@ func (m *Series) Size() (n int) { n += 1 + l + sovLogproto(uint64(l)) } } + if m.StreamHash != 0 { + n += 1 + sovLogproto(uint64(m.StreamHash)) + } return n } @@ -5174,6 +5217,7 @@ func (this *StreamAdapter) String() string { s := strings.Join([]string{`&StreamAdapter{`, `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, `Entries:` + repeatedStringForEntries + `,`, + `Hash:` + fmt.Sprintf("%v", this.Hash) + `,`, `}`, }, "") return s @@ -5224,6 +5268,7 @@ func (this *Series) String() string { s := strings.Join([]string{`&Series{`, `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, `Samples:` + repeatedStringForSamples + `,`, + `StreamHash:` + fmt.Sprintf("%v", this.StreamHash) + `,`, `}`, }, "") return s @@ -6571,6 +6616,25 @@ func (m *StreamAdapter) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) + } + m.Hash = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Hash |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipLogproto(dAtA[iNdEx:]) @@ -6993,6 +7057,25 @@ func (m *Series) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StreamHash", wireType) + } + m.StreamHash = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StreamHash |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipLogproto(dAtA[iNdEx:]) diff --git a/pkg/logproto/logproto.proto b/pkg/logproto/logproto.proto index 9efb2aee699c3..e3522f0e1a077 100644 --- a/pkg/logproto/logproto.proto +++ b/pkg/logproto/logproto.proto @@ -82,6 +82,8 @@ message LabelResponse { message StreamAdapter { string labels = 1 [(gogoproto.jsontag) = "labels"]; repeated EntryAdapter entries = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "entries"]; + // hash contains the original hash of the stream. + uint64 hash = 3 [(gogoproto.jsontag) = "-"]; } message EntryAdapter { @@ -103,6 +105,7 @@ message LegacySample { message Series { string labels = 1 [(gogoproto.jsontag) = "labels"]; repeated Sample samples = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "samples"]; + uint64 streamHash = 3 [(gogoproto.jsontag) = "streamHash"]; } message TailRequest { diff --git a/pkg/logproto/types.go b/pkg/logproto/types.go index 655fc0b83ccb3..554ca79c469fc 100644 --- a/pkg/logproto/types.go +++ b/pkg/logproto/types.go @@ -12,6 +12,7 @@ import ( type Stream struct { Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"` + Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"-"` } // Entry is a log entry with a timestamp. @@ -40,6 +41,11 @@ func (m *Stream) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Hash != 0 { + i = encodeVarintLogproto(dAtA, i, m.Hash) + i-- + dAtA[i] = 0x18 + } if len(m.Entries) > 0 { for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- { { @@ -197,6 +203,25 @@ func (m *Stream) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) + } + m.Hash = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLogproto + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Hash |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipLogproto(dAtA[iNdEx:]) @@ -357,6 +382,9 @@ func (m *Stream) Size() (n int) { n += 1 + l + sovLogproto(uint64(l)) } } + if m.Hash != 0 { + n += 1 + sovLogproto(m.Hash) + } return n } @@ -405,7 +433,7 @@ func (m *Stream) Equal(that interface{}) bool { return false } } - return true + return m.Hash == that1.Hash } func (m *Entry) Equal(that interface{}) bool { diff --git a/pkg/logproto/types_test.go b/pkg/logproto/types_test.go index 97517ce724883..0dd87f727e26a 100644 --- a/pkg/logproto/types_test.go +++ b/pkg/logproto/types_test.go @@ -12,6 +12,7 @@ var ( line = `level=info ts=2019-12-12T15:00:08.325Z caller=compact.go:441 component=tsdb msg="compact blocks" count=3 mint=1576130400000 maxt=1576152000000 ulid=01DVX9ZHNM71GRCJS7M34Q0EV7 sources="[01DVWNC6NWY1A60AZV3Z6DGS65 01DVWW7XXX75GHA6ZDTD170CSZ 01DVX33N5W86CWJJVRPAVXJRWJ]" duration=2.897213221s` stream = Stream{ Labels: `{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}`, + Hash: 1234*10 ^ 9, Entries: []Entry{ {now, line}, {now.Add(1 * time.Second), line}, @@ -21,6 +22,7 @@ var ( } streamAdapter = StreamAdapter{ Labels: `{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}`, + Hash: 1234*10 ^ 9, Entries: []EntryAdapter{ {now, line}, {now.Add(1 * time.Second), line}, @@ -91,7 +93,6 @@ func BenchmarkStream(b *testing.B) { b.Fatal(err) } } - } func BenchmarkStreamAdapter(b *testing.B) { @@ -107,5 +108,4 @@ func BenchmarkStreamAdapter(b *testing.B) { b.Fatal(err) } } - } diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 5b6935af11fb6..67a21f15fe790 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -2365,25 +2365,33 @@ type logData struct { type generator func(i int64) logData -func newStream(n int64, f generator, labels string) logproto.Stream { +func newStream(n int64, f generator, lbsString string) logproto.Stream { + labels, err := ParseLabels(lbsString) + if err != nil { + panic(err) + } entries := []logproto.Entry{} for i := int64(0); i < n; i++ { entries = append(entries, f(i).Entry) } return logproto.Stream{ Entries: entries, - Labels: labels, + Labels: labels.String(), } } -func newSeries(n int64, f generator, labels string) logproto.Series { +func newSeries(n int64, f generator, lbsString string) logproto.Series { + labels, err := ParseLabels(lbsString) + if err != nil { + panic(err) + } samples := []logproto.Sample{} for i := int64(0); i < n; i++ { samples = append(samples, f(i).Sample) } return logproto.Series{ Samples: samples, - Labels: labels, + Labels: labels.String(), } } @@ -2533,6 +2541,8 @@ func (errorIterator) Error() error { return ErrMock } func (errorIterator) Labels() string { return "" } +func (errorIterator) StreamHash() uint64 { return 0 } + func (errorIterator) Entry() logproto.Entry { return logproto.Entry{} } func (errorIterator) Sample() logproto.Sample { return logproto.Sample{} } diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 1471384d68096..633ba595ca011 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -10,7 +10,7 @@ import ( const MaxInternedStrings = 1024 -var emptyLabelsResult = NewLabelsResult(labels.Labels{}, labels.Labels{}.Hash()) +var EmptyLabelsResult = NewLabelsResult(labels.Labels{}, labels.Labels{}.Hash()) // LabelsResult is a computed labels result that contains the labels set with associated string and hash. // The is mainly used for caching and returning labels computations out of pipelines and stages. @@ -274,7 +274,7 @@ func (b *LabelsBuilder) GroupedLabels() LabelsResult { return b.LabelsResult() } if b.noLabels { - return emptyLabelsResult + return EmptyLabelsResult } // unchanged path. if len(b.del) == 0 && len(b.add) == 0 { diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index a34719af8c2b4..9faed48034d3a 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -33,6 +33,7 @@ type SampleExtractor interface { // StreamSampleExtractor extracts sample for a log line. // A StreamSampleExtractor never mutate the received line. type StreamSampleExtractor interface { + BaseLabels() LabelsResult Process(line []byte) (float64, LabelsResult, bool) ProcessString(line string) (float64, LabelsResult, bool) } @@ -97,6 +98,8 @@ func (l *streamLineSampleExtractor) ProcessString(line string) (float64, LabelsR return l.Process(unsafeGetBytes(line)) } +func (l *streamLineSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } + type convertionFn func(value string) (float64, error) type labelSampleExtractor struct { @@ -196,6 +199,8 @@ func (l *streamLabelSampleExtractor) ProcessString(line string) (float64, Labels return l.Process(unsafeGetBytes(line)) } +func (l *streamLabelSampleExtractor) BaseLabels() LabelsResult { return l.builder.currentResult } + func convertFloat(v string) (float64, error) { return strconv.ParseFloat(v, 64) } diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index 52dd8ab2a6542..8430bac68a01f 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -18,6 +18,7 @@ type Pipeline interface { // StreamPipeline transform and filter log lines and labels. // A StreamPipeline never mutate the received line. type StreamPipeline interface { + BaseLabels() LabelsResult Process(line []byte) (resultLine []byte, resultLabels LabelsResult, skip bool) ProcessString(line string) (resultLine string, resultLabels LabelsResult, skip bool) } @@ -59,6 +60,8 @@ func (n noopStreamPipeline) ProcessString(line string) (string, LabelsResult, bo return line, n.LabelsResult, true } +func (n noopStreamPipeline) BaseLabels() LabelsResult { return n.LabelsResult } + func (n *noopPipeline) ForStream(labels labels.Labels) StreamPipeline { h := labels.Hash() if cached, ok := n.cache[h]; ok { @@ -153,6 +156,8 @@ func (p *streamPipeline) ProcessString(line string) (string, LabelsResult, bool) return unsafeGetString(lb), lr, ok } +func (p *streamPipeline) BaseLabels() LabelsResult { return p.builder.currentResult } + // ReduceStages reduces multiple stages into one. func ReduceStages(stages []Stage) Stage { if len(stages) == 0 { diff --git a/pkg/logql/range_vector_test.go b/pkg/logql/range_vector_test.go index 74583781c9a48..35372034dbb82 100644 --- a/pkg/logql/range_vector_test.go +++ b/pkg/logql/range_vector_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/prometheus/prometheus/promql" - promql_parser "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/iter" @@ -29,19 +28,21 @@ var samples = []logproto.Sample{ } var ( - labelFoo, _ = promql_parser.ParseMetric("{app=\"foo\"}") - labelBar, _ = promql_parser.ParseMetric("{app=\"bar\"}") + labelFoo, _ = ParseLabels("{app=\"foo\"}") + labelBar, _ = ParseLabels("{app=\"bar\"}") ) func newSampleIterator() iter.SampleIterator { return iter.NewSortSampleIterator([]iter.SampleIterator{ iter.NewSeriesIterator(logproto.Series{ - Labels: labelFoo.String(), - Samples: samples, + Labels: labelFoo.String(), + Samples: samples, + StreamHash: labelFoo.Hash(), }), iter.NewSeriesIterator(logproto.Series{ - Labels: labelBar.String(), - Samples: samples, + Labels: labelBar.String(), + Samples: samples, + StreamHash: labelBar.Hash(), }), }) } diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 6f0cd94f13603..fe943888f3d30 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -104,7 +104,7 @@ func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Strea var found bool s, found = resByStream[out.String()] if !found { - s = &logproto.Stream{Labels: out.String()} + s = &logproto.Stream{Labels: out.String(), Hash: sp.BaseLabels().Hash()} resByStream[out.String()] = s } s.Entries = append(s.Entries, logproto.Entry{ @@ -132,7 +132,7 @@ func processSeries(in []logproto.Stream, ex log.SampleExtractor) []logproto.Seri var found bool s, found = resBySeries[lbs.String()] if !found { - s = &logproto.Series{Labels: lbs.String()} + s = &logproto.Series{Labels: lbs.String(), StreamHash: exs.BaseLabels().Hash()} resBySeries[lbs.String()] = s } s.Samples = append(s.Samples, logproto.Sample{ @@ -264,6 +264,7 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (stream } stream.Labels = ls.String() + stream.Hash = ls.Hash() streams = append(streams, stream) } return streams diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index ddc8f5fa80405..1ad86a038b5af 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -338,6 +338,10 @@ func (it *logBatchIterator) Labels() string { return it.curr.Labels() } +func (it *logBatchIterator) StreamHash() uint64 { + return it.curr.StreamHash() +} + func (it *logBatchIterator) Error() error { if it.err != nil { return it.err @@ -439,7 +443,7 @@ func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through iterators[i], iterators[j] = iterators[j], iterators[i] } } - result = append(result, iter.NewNonOverlappingIterator(iterators, "")) + result = append(result, iter.NewNonOverlappingIterator(iterators)) } return iter.NewMergeEntryIterator(it.ctx, result, it.direction), nil @@ -479,6 +483,10 @@ func (it *sampleBatchIterator) Labels() string { return it.curr.Labels() } +func (it *sampleBatchIterator) StreamHash() uint64 { + return it.curr.StreamHash() +} + func (it *sampleBatchIterator) Error() error { if it.err != nil { return it.err @@ -573,7 +581,7 @@ func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, thro } iterators = append(iterators, iterator) } - result = append(result, iter.NewNonOverlappingSampleIterator(iterators, "")) + result = append(result, iter.NewNonOverlappingSampleIterator(iterators)) } return iter.NewMergeSampleIterator(it.ctx, result), nil diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 4d1a620a931d9..b1d9efc3eb380 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -26,7 +26,7 @@ var NilMetrics = NewChunkMetrics(nil, 0) func Test_batchIterSafeStart(t *testing.T) { stream := logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -76,7 +76,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "forward with overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -89,7 +89,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -102,7 +102,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -115,7 +115,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -128,7 +128,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -141,7 +141,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -156,7 +156,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, []logproto.Stream{ { - Labels: fooLabels, + Labels: fooLabels.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -177,7 +177,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from, from.Add(4 * time.Millisecond), logproto.FORWARD, 2, @@ -185,7 +185,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "forward all overlap and all chunks have a from time less than query from time": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -198,7 +198,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -215,7 +215,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -232,7 +232,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -249,7 +249,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -266,7 +266,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -285,7 +285,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, []logproto.Stream{ { - Labels: fooLabels, + Labels: fooLabels.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -306,7 +306,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from.Add(1 * time.Millisecond), from.Add(5 * time.Millisecond), logproto.FORWARD, 2, @@ -314,7 +314,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "forward with overlapping non-continuous entries": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -331,7 +331,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -344,7 +344,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -357,7 +357,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -372,7 +372,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, []logproto.Stream{ { - Labels: fooLabels, + Labels: fooLabels.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -389,7 +389,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from, from.Add(3 * time.Millisecond), logproto.FORWARD, 2, @@ -397,7 +397,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "backward with overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -410,7 +410,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -423,7 +423,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -436,7 +436,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -449,7 +449,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -462,7 +462,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -477,7 +477,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, []logproto.Stream{ { - Labels: fooLabels, + Labels: fooLabels.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -498,7 +498,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from, from.Add(4 * time.Millisecond), logproto.BACKWARD, 2, @@ -506,7 +506,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "backward all overlap and all chunks have a through time greater than query through time": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -523,7 +523,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -540,7 +540,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -557,7 +557,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -574,7 +574,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -591,7 +591,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -606,7 +606,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, []logproto.Stream{ { - Labels: fooLabels, + Labels: fooLabels.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -627,7 +627,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from, from.Add(4 * time.Millisecond), logproto.BACKWARD, 2, @@ -635,7 +635,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "backward with overlapping non-continuous entries": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(0 * time.Millisecond), @@ -648,7 +648,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(1 * time.Millisecond), @@ -661,7 +661,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -674,7 +674,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(4 * time.Millisecond), @@ -689,7 +689,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, []logproto.Stream{ { - Labels: fooLabels, + Labels: fooLabels.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(7 * time.Millisecond), @@ -726,7 +726,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from, from.Add(8 * time.Millisecond), logproto.BACKWARD, 2, @@ -734,7 +734,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "forward without overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -747,7 +747,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -756,7 +756,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -767,7 +767,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, []logproto.Stream{ { - Labels: fooLabels, + Labels: fooLabels.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -784,7 +784,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from, from.Add(3 * time.Millisecond), logproto.FORWARD, 2, @@ -792,7 +792,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "backward without overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -805,7 +805,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -814,7 +814,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -825,7 +825,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, []logproto.Stream{ { - Labels: fooLabels, + Labels: fooLabels.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -842,7 +842,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from, from.Add(3 * time.Millisecond), logproto.BACKWARD, 2, @@ -858,7 +858,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { "forward identicals": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -867,7 +867,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -876,7 +876,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -889,7 +889,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -898,7 +898,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -907,7 +907,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -916,7 +916,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -927,7 +927,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, []logproto.Stream{ { - Labels: fooLabels, + Labels: fooLabels.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -944,7 +944,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from, from.Add(4 * time.Millisecond), logproto.FORWARD, 1, @@ -988,7 +988,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { "forward with overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -1001,7 +1001,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -1014,7 +1014,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -1027,7 +1027,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -1040,7 +1040,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -1053,7 +1053,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -1068,7 +1068,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, []logproto.Series{ { - Labels: fooLabels, + Labels: fooLabels.String(), Samples: []logproto.Sample{ { Timestamp: from.UnixNano(), @@ -1093,14 +1093,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from, from.Add(4 * time.Millisecond), 2, }, "forward with overlapping non-continuous entries": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -1117,7 +1117,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -1130,7 +1130,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(time.Millisecond), @@ -1143,7 +1143,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -1158,7 +1158,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, []logproto.Series{ { - Labels: fooLabels, + Labels: fooLabels.String(), Samples: []logproto.Sample{ { Timestamp: from.UnixNano(), @@ -1178,14 +1178,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from, from.Add(3 * time.Millisecond), 2, }, "forward last chunk boundaries equal to end": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: time.Unix(1, 0), @@ -1198,7 +1198,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { @@ -1212,7 +1212,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: time.Unix(3, 0), @@ -1227,7 +1227,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, []logproto.Series{ { - Labels: fooLabels, + Labels: fooLabels.String(), Samples: []logproto.Sample{ { Timestamp: time.Unix(1, 0).UnixNano(), @@ -1242,14 +1242,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), time.Unix(1, 0), time.Unix(3, 0), 2, }, "forward last chunk boundaries equal to end and start": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: time.Unix(1, 0), @@ -1262,7 +1262,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { @@ -1278,7 +1278,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, []logproto.Series{ { - Labels: fooLabels, + Labels: fooLabels.String(), Samples: []logproto.Sample{ { Timestamp: time.Unix(1, 0).UnixNano(), @@ -1293,14 +1293,14 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), time.Unix(1, 0), time.Unix(1, 0), 2, }, "forward without overlap": { []*LazyChunk{ newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -1313,7 +1313,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -1322,7 +1322,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }), newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(3 * time.Millisecond), @@ -1333,7 +1333,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, []logproto.Series{ { - Labels: fooLabels, + Labels: fooLabels.String(), Samples: []logproto.Sample{ { Timestamp: from.UnixNano(), @@ -1353,7 +1353,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { }, }, }, - fooLabelsWithName, + fooLabelsWithName.String(), from, from.Add(3 * time.Millisecond), 2, }, @@ -1391,7 +1391,7 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { func TestPartitionOverlappingchunks(t *testing.T) { var ( oneThroughFour = newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -1404,7 +1404,7 @@ func TestPartitionOverlappingchunks(t *testing.T) { }, }) two = newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(1 * time.Millisecond), @@ -1413,7 +1413,7 @@ func TestPartitionOverlappingchunks(t *testing.T) { }, }) three = newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from.Add(2 * time.Millisecond), @@ -1636,7 +1636,7 @@ func Test_IsInvalidChunkError(t *testing.T) { func TestBatchCancel(t *testing.T) { createChunk := func(from time.Time) *LazyChunk { return newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), Entries: []logproto.Entry{ { Timestamp: from, @@ -1665,7 +1665,7 @@ func TestBatchCancel(t *testing.T) { }, } - it, err := newLogBatchIterator(ctx, s, NilMetrics, chunks, 1, newMatchers(fooLabels), log.NewNoopPipeline(), logproto.FORWARD, from, time.Now(), nil) + it, err := newLogBatchIterator(ctx, s, NilMetrics, chunks, 1, newMatchers(fooLabels.String()), log.NewNoopPipeline(), logproto.FORWARD, from, time.Now(), nil) require.NoError(t, err) defer require.NoError(t, it.Close()) for it.Next() { diff --git a/pkg/storage/lazy_chunk.go b/pkg/storage/lazy_chunk.go index 436e0274dfe10..8757cc9c136f3 100644 --- a/pkg/storage/lazy_chunk.go +++ b/pkg/storage/lazy_chunk.go @@ -85,7 +85,7 @@ func (c *LazyChunk) Iterator( if direction == logproto.FORWARD { return iter.NewTimeRangedIterator( - iter.NewNonOverlappingIterator(its, ""), + iter.NewNonOverlappingIterator(its), from, through, ), nil @@ -106,7 +106,7 @@ func (c *LazyChunk) Iterator( its[i], its[j] = its[j], its[i] } - return iter.NewNonOverlappingIterator(its, ""), nil + return iter.NewNonOverlappingIterator(its), nil } // SampleIterator returns an sample iterator. @@ -166,7 +166,7 @@ func (c *LazyChunk) SampleIterator( // build the final iterator bound to the requested time range. return iter.NewTimeRangedSampleIterator( - iter.NewNonOverlappingSampleIterator(its, ""), + iter.NewNonOverlappingSampleIterator(its), from.UnixNano(), through.UnixNano(), ), nil diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go index 8d8fc18ab0004..c29e37ef6f8ce 100644 --- a/pkg/storage/lazy_chunk_test.go +++ b/pkg/storage/lazy_chunk_test.go @@ -24,7 +24,8 @@ func TestLazyChunkIterator(t *testing.T) { }{ { newLazyChunk(logproto.Stream{ - Labels: fooLabelsWithName, + Labels: fooLabelsWithName.String(), + Hash: fooLabelsWithName.Hash(), Entries: []logproto.Entry{ { Timestamp: from, @@ -34,7 +35,8 @@ func TestLazyChunkIterator(t *testing.T) { }), []logproto.Stream{ { - Labels: fooLabels, + Labels: fooLabels.String(), + Hash: fooLabels.Hash(), Entries: []logproto.Entry{ { Timestamp: from, diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index bbd0d3cd5d7e1..b2ce46e2168fc 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -903,7 +903,7 @@ func TestStore_MultipleBoltDBShippersInConfig(t *testing.T) { defer store.Stop() // get all the chunks from both the stores - chunks, err := store.Get(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName)...) + chunks, err := store.Get(ctx, "fake", timeToModelTime(firstStoreDate), timeToModelTime(secondStoreDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...) require.NoError(t, err) // we get common chunk twice because it is indexed in both the stores @@ -933,9 +933,10 @@ func parseDate(in string) time.Time { return t } -func buildTestStreams(labels string, tr timeRange) logproto.Stream { +func buildTestStreams(labels labels.Labels, tr timeRange) logproto.Stream { stream := logproto.Stream{ - Labels: labels, + Labels: labels.String(), + Hash: labels.Hash(), Entries: []logproto.Entry{}, } diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index cb63fe9c18092..37c231abe86c9 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -23,8 +23,8 @@ import ( ) var ( - fooLabelsWithName = "{foo=\"bar\", __name__=\"logs\"}" - fooLabels = "{foo=\"bar\"}" + fooLabelsWithName = labels.Labels{{Name: "foo", Value: "bar"}, {Name: "__name__", Value: "logs"}} + fooLabels = labels.Labels{{Name: "foo", Value: "bar"}} ) var from = time.Unix(0, time.Millisecond.Nanoseconds())