Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes race conditions in the batch iterator. #2773

Merged
merged 51 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
03279b1
Adds logfmt, regexp and json logql parser
cyriltovena Sep 15, 2020
31c2f3f
hook the ast with parsers.
cyriltovena Sep 16, 2020
eaf72bd
hook parser with memchunk.
cyriltovena Sep 17, 2020
7d1dc7b
hook parser with the storage.
cyriltovena Sep 17, 2020
484afc1
hook parser with ingesters
cyriltovena Sep 17, 2020
0121a3c
fixes all tests
cyriltovena Sep 17, 2020
62f2829
Refactor to pipeline and implement ast parsing.
cyriltovena Sep 21, 2020
89c489c
Fixes the lexer for duration and range
cyriltovena Sep 22, 2020
4238173
Fixes all tests and add some for label filters
cyriltovena Sep 23, 2020
88ad104
Add label and line format.
cyriltovena Sep 23, 2020
c7791a4
Add tests for fmt label and line with validations.
cyriltovena Sep 24, 2020
0651e25
Polishing parsers and add some more test cases
cyriltovena Sep 25, 2020
4c0570d
Finish the unwrap parser, still need to add more tests
cyriltovena Sep 29, 2020
01e93c0
Indent this hell.
cyriltovena Sep 29, 2020
e455c88
Moar tests and it works.
cyriltovena Sep 29, 2020
8bc18e5
Add more tests which lead me to find a bug in the lexer
cyriltovena Sep 30, 2020
08d2cf7
Add more tests and fix all engine tests
cyriltovena Sep 30, 2020
b801417
Fixes match stage in promtail pipelines.
cyriltovena Sep 30, 2020
850b003
Hook Pipeline into ingester, tailer and storage.
cyriltovena Oct 1, 2020
31c26c0
Correctly setup sharding for logqlv2
cyriltovena Oct 1, 2020
b5e11d0
Fixes precedences issue with label filters and add moar tests :v:
cyriltovena Oct 2, 2020
0fd6018
Adds quantile_over_time, grouping for non associate range aggregation…
cyriltovena Oct 2, 2020
2ca6677
Extract with grouping
cyriltovena Oct 3, 2020
4effb67
Adds parsing duration on unwrap
cyriltovena Oct 5, 2020
832a977
Improve the lexer to support more common identifier as functions.
cyriltovena Oct 6, 2020
6563d6e
Fixes the frontend logs to include org_id.
cyriltovena Oct 6, 2020
92f7c39
Merge branch 'fix-orgid-frontend' into logql-parser
cyriltovena Oct 6, 2020
5578dbb
Support byte sizes in label filters.
jeschkies Oct 9, 2020
13132ad
Wip on error handling.
cyriltovena Oct 12, 2020
db07446
Fixes json parser with prometheus label name rules.
cyriltovena Oct 12, 2020
78973cf
fixup! Support byte sizes in label filters.
jeschkies Oct 12, 2020
bbacba7
Merge remote-tracking branch 'cyril/logql-parser' into karsten/bytes-…
jeschkies Oct 12, 2020
25dd730
Merge pull request #5 from jeschkies/karsten/bytes-filter
cyriltovena Oct 12, 2020
c054a5d
Wip error handling, commit before big refactoring.
cyriltovena Oct 13, 2020
e7d8234
Merge branch 'logql-parser' of github.com:cyriltovena/loki into logql…
cyriltovena Oct 13, 2020
5ab8b5c
Refactoring in progress.
cyriltovena Oct 13, 2020
5272d91
Work in progress.
cyriltovena Oct 13, 2020
1aa1609
Got something that builds and throw __error__ labels properly now.
cyriltovena Oct 14, 2020
1af9c14
Add error handling + fixes groupins and post filtering.
cyriltovena Oct 14, 2020
960ef5a
400 on pipeline errors.
cyriltovena Oct 14, 2020
de83465
Fixes a races in the log pipeline.
cyriltovena Oct 15, 2020
87c4f00
Unsure the key is parsable and valid.
cyriltovena Oct 15, 2020
50315b1
Cleanup and code documentation.
cyriltovena Oct 15, 2020
54dd6c2
Lint.
cyriltovena Oct 15, 2020
a50490f
Lint.
cyriltovena Oct 15, 2020
cbbc37c
Merge remote-tracking branch 'upstream/master' into logql-parser
cyriltovena Oct 15, 2020
801b721
Fixes frontend handler.
cyriltovena Oct 15, 2020
1aee415
Fixes old test.
cyriltovena Oct 15, 2020
1ea917f
Fix go1.15 local failing test.
cyriltovena Oct 15, 2020
faecc80
Fixes race conditions in the batch iterator.
cyriltovena Oct 16, 2020
e9e4578
Merge remote-tracking branch 'upstream/master' into iter-batch-race
cyriltovena Oct 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 59 additions & 76 deletions pkg/storage/cache.go → pkg/iter/cache.go
Original file line number Diff line number Diff line change
@@ -1,66 +1,56 @@
package storage
package iter

import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)

type CacheEntryIterator interface {
EntryIterator
Reset()
}

// cachedIterator is an iterator that caches iteration to be replayed later on.
type cachedIterator struct {
cache []*logproto.Entry
base iter.EntryIterator
cache []entryWithLabels
base EntryIterator // once set to nil it means we have to use the cache.

labels string
curr int
curr int

closeErr error
iterErr error
}

// newCachedIterator creates an iterator that cache iteration result and can be iterated again
// NewCachedIterator creates an iterator that cache iteration result and can be iterated again
// after closing it without re-using the underlaying iterator `it`.
// The cache iterator should be used for entries that belongs to the same stream only.
func newCachedIterator(it iter.EntryIterator, cap int) *cachedIterator {
func NewCachedIterator(it EntryIterator, cap int) CacheEntryIterator {
c := &cachedIterator{
base: it,
cache: make([]*logproto.Entry, 0, cap),
cache: make([]entryWithLabels, 0, cap),
curr: -1,
}
c.load()
return c
}

func (it *cachedIterator) reset() {
func (it *cachedIterator) Reset() {
it.curr = -1
}

func (it *cachedIterator) load() {
func (it *cachedIterator) Next() bool {
if it.base != nil {
defer func() {
ok := it.base.Next()
// we're done with the base iterator.
if !ok {
it.closeErr = it.base.Close()
it.iterErr = it.base.Error()
it.base = nil
it.reset()
}()
// set labels using the first entry
if !it.base.Next() {
return
return false
}
it.labels = it.base.Labels()

// add all entries until the base iterator is exhausted
for {
e := it.base.Entry()
it.cache = append(it.cache, &e)
if !it.base.Next() {
break
}
}

// we're caching entries
it.cache = append(it.cache, entryWithLabels{entry: it.base.Entry(), labels: it.base.Labels()})
it.curr++
return true
}
}

func (it *cachedIterator) Next() bool {
// second pass
if len(it.cache) == 0 {
it.cache = nil
return false
Expand All @@ -73,81 +63,74 @@ func (it *cachedIterator) Next() bool {
}

func (it *cachedIterator) Entry() logproto.Entry {
if len(it.cache) == 0 {
if len(it.cache) == 0 || it.curr < 0 {
return logproto.Entry{}
}
if it.curr < 0 {
return *it.cache[0]
}
return *it.cache[it.curr]

return it.cache[it.curr].entry
}

func (it *cachedIterator) Labels() string {
return it.labels
if len(it.cache) == 0 || it.curr < 0 {
return ""
}
return it.cache[it.curr].labels
}

func (it *cachedIterator) Error() error { return it.iterErr }

func (it *cachedIterator) Close() error {
it.reset()
it.Reset()
return it.closeErr
}

type CacheSampleIterator interface {
SampleIterator
Reset()
}

// cachedIterator is an iterator that caches iteration to be replayed later on.
type cachedSampleIterator struct {
cache []logproto.Sample
base iter.SampleIterator
cache []sampleWithLabels
base SampleIterator

labels string
curr int
curr int

closeErr error
iterErr error
}

// newSampleCachedIterator creates an iterator that cache iteration result and can be iterated again
// after closing it without re-using the underlaying iterator `it`.
// The cache iterator should be used for entries that belongs to the same stream only.
func newCachedSampleIterator(it iter.SampleIterator, cap int) *cachedSampleIterator {
func NewCachedSampleIterator(it SampleIterator, cap int) CacheSampleIterator {
c := &cachedSampleIterator{
base: it,
cache: make([]logproto.Sample, 0, cap),
cache: make([]sampleWithLabels, 0, cap),
curr: -1,
}
c.load()
return c
}

func (it *cachedSampleIterator) reset() {
func (it *cachedSampleIterator) Reset() {
it.curr = -1
}

func (it *cachedSampleIterator) load() {
func (it *cachedSampleIterator) Next() bool {
if it.base != nil {
defer func() {
ok := it.base.Next()
// we're done with the base iterator.
if !ok {
it.closeErr = it.base.Close()
it.iterErr = it.base.Error()
it.base = nil
it.reset()
}()
// set labels using the first entry
if !it.base.Next() {
return
return false
}
it.labels = it.base.Labels()

// add all entries until the base iterator is exhausted
for {
it.cache = append(it.cache, it.base.Sample())
if !it.base.Next() {
break
}
}

// we're caching entries
it.cache = append(it.cache, sampleWithLabels{Sample: it.base.Sample(), labels: it.base.Labels()})
it.curr++
return true
}
}

func (it *cachedSampleIterator) Next() bool {
// second pass
if len(it.cache) == 0 {
it.cache = nil
return false
Expand All @@ -160,22 +143,22 @@ func (it *cachedSampleIterator) Next() bool {
}

func (it *cachedSampleIterator) Sample() logproto.Sample {
if len(it.cache) == 0 {
if len(it.cache) == 0 || it.curr < 0 {
return logproto.Sample{}
}
if it.curr < 0 {
return it.cache[0]
}
return it.cache[it.curr]
return it.cache[it.curr].Sample
}

func (it *cachedSampleIterator) Labels() string {
return it.labels
if len(it.cache) == 0 || it.curr < 0 {
return ""
}
return it.cache[it.curr].labels
}

func (it *cachedSampleIterator) Error() error { return it.iterErr }

func (it *cachedSampleIterator) Close() error {
it.reset()
it.Reset()
return it.closeErr
}
25 changes: 11 additions & 14 deletions pkg/storage/cache_test.go → pkg/iter/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package storage
package iter

import (
"errors"
Expand All @@ -7,7 +7,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)

Expand All @@ -20,12 +19,11 @@ func Test_CachedIterator(t *testing.T) {
{Timestamp: time.Unix(0, 3), Line: "3"},
},
}
c := newCachedIterator(iter.NewStreamIterator(stream), 3)
c := NewCachedIterator(NewStreamIterator(stream), 3)

assert := func() {
// we should crash for call of entry without next although that's not expected.
require.Equal(t, stream.Labels, c.Labels())
require.Equal(t, stream.Entries[0], c.Entry())
require.Equal(t, "", c.Labels())
require.Equal(t, logproto.Entry{}, c.Entry())
require.Equal(t, true, c.Next())
require.Equal(t, stream.Entries[0], c.Entry())
require.Equal(t, true, c.Next())
Expand All @@ -48,7 +46,7 @@ func Test_CachedIterator(t *testing.T) {

func Test_EmptyCachedIterator(t *testing.T) {

c := newCachedIterator(iter.NoopIterator, 0)
c := NewCachedIterator(NoopIterator, 0)

require.Equal(t, "", c.Labels())
require.Equal(t, logproto.Entry{}, c.Entry())
Expand All @@ -68,7 +66,7 @@ func Test_EmptyCachedIterator(t *testing.T) {

func Test_ErrorCachedIterator(t *testing.T) {

c := newCachedIterator(&errorIter{}, 0)
c := NewCachedIterator(&errorIter{}, 0)

require.Equal(t, false, c.Next())
require.Equal(t, "", c.Labels())
Expand All @@ -86,12 +84,11 @@ func Test_CachedSampleIterator(t *testing.T) {
{Timestamp: time.Unix(0, 3).UnixNano(), Hash: 3, Value: 3.},
},
}
c := newCachedSampleIterator(iter.NewSeriesIterator(series), 3)
c := NewCachedSampleIterator(NewSeriesIterator(series), 3)

assert := func() {
// we should crash for call of entry without next although that's not expected.
require.Equal(t, series.Labels, c.Labels())
require.Equal(t, series.Samples[0], c.Sample())
require.Equal(t, "", c.Labels())
require.Equal(t, logproto.Sample{}, c.Sample())
require.Equal(t, true, c.Next())
require.Equal(t, series.Samples[0], c.Sample())
require.Equal(t, true, c.Next())
Expand All @@ -114,7 +111,7 @@ func Test_CachedSampleIterator(t *testing.T) {

func Test_EmptyCachedSampleIterator(t *testing.T) {

c := newCachedSampleIterator(iter.NoopIterator, 0)
c := NewCachedSampleIterator(NoopIterator, 0)

require.Equal(t, "", c.Labels())
require.Equal(t, logproto.Sample{}, c.Sample())
Expand All @@ -134,7 +131,7 @@ func Test_EmptyCachedSampleIterator(t *testing.T) {

func Test_ErrorCachedSampleIterator(t *testing.T) {

c := newCachedSampleIterator(&errorIter{}, 0)
c := NewCachedSampleIterator(&errorIter{}, 0)

require.Equal(t, false, c.Next())
require.Equal(t, "", c.Labels())
Expand Down
Loading