Skip to content

Commit

Permalink
storage: fix missing logs with batched chunk iterator (#1299)
Browse files Browse the repository at this point in the history
* fix batched chunk iterator

* fix expected series labels in iterator_test.go

* add query range boundary check

* add __name__ label matcher to test cases

* trim __name__ without mutating chunks
  • Loading branch information
putrasattvika authored and cyriltovena committed Nov 26, 2019
1 parent 8eced44 commit c89df1a
Show file tree
Hide file tree
Showing 3 changed files with 454 additions and 32 deletions.
75 changes: 62 additions & 13 deletions pkg/storage/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ type batchChunkIterator struct {

// newBatchChunkIterator creates a new batch iterator with the given batchSize.
func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, batchSize int, matchers []*labels.Matcher, filter logql.Filter, req *logproto.QueryRequest) *batchChunkIterator {

// __name__ is not something we filter by because it's a constant in loki and only used for upstream compatibility.
// Therefore remove it
for i := range matchers {
if matchers[i].Name == labels.MetricName {
matchers = append(matchers[:i], matchers[i+1:]...)
break
}
}

res := &batchChunkIterator{
batchSize: batchSize,
matchers: matchers,
Expand Down Expand Up @@ -112,6 +122,9 @@ func (it *batchChunkIterator) Next() bool {
}

func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// the first chunk of the batch
headChunk := it.chunks.Peek()

// pop the next batch of chunks and append/preprend previous overlapping chunks
// so we can merge/de-dupe overlapping entries.
batch := make([]*chunkenc.LazyChunk, 0, it.batchSize+len(it.lastOverlapping))
Expand All @@ -130,6 +143,14 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// so that overlapping chunks are together
if it.req.Direction == logproto.BACKWARD {
from = time.Unix(0, nextChunk.Chunk.Through.UnixNano())

// we have to reverse the inclusivity of the chunk iterator from
// [from, through) to (from, through] for backward queries, except when
// the batch's `from` is equal to the query's Start. This can be achieved
// by shifting `from` by one nanosecond.
if !from.Equal(it.req.Start) {
from = from.Add(time.Nanosecond)
}
} else {
through = time.Unix(0, nextChunk.Chunk.From.UnixNano())
}
Expand All @@ -149,7 +170,7 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
// └────────────────────┘
//
// And nextChunk is # 49, we need to keep references to #47 and #48 as they won't be
// iterated over completely (we're clipping through to #49's from) and then add them to the next batch.
// iterated over completely (we're clipping through to #49's from) and then add them to the next batch.
it.lastOverlapping = it.lastOverlapping[:0]
for _, c := range batch {
if it.req.Direction == logproto.BACKWARD {
Expand All @@ -162,13 +183,27 @@ func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) {
}
}
}
}

if it.req.Direction == logproto.BACKWARD {
through = time.Unix(0, headChunk.Chunk.Through.UnixNano())

if through.After(it.req.End) {
through = it.req.End
}

// we have to reverse the inclusivity of the chunk iterator from
// [from, through) to (from, through] for backward queries, except when
// the batch's `through` is equal to the query's End. This can be achieved
// by shifting `through` by one nanosecond.
if !through.Equal(it.req.End) {
through = through.Add(time.Nanosecond)
}
} else {
if len(it.lastOverlapping) > 0 {
if it.req.Direction == logproto.BACKWARD {
through = time.Unix(0, it.lastOverlapping[0].Chunk.From.UnixNano())
} else {
from = time.Unix(0, it.lastOverlapping[0].Chunk.Through.UnixNano())
}
from = time.Unix(0, headChunk.Chunk.From.UnixNano())

if from.Before(it.req.Start) {
from = it.req.Start
}
}

Expand Down Expand Up @@ -250,12 +285,9 @@ func buildIterators(ctx context.Context, chks map[model.Fingerprint][][]*chunken

func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter logql.Filter, direction logproto.Direction, from, through time.Time) (iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))
if chks[0][0].Chunk.Metric.Has("__name__") {
labelsBuilder := labels.NewBuilder(chks[0][0].Chunk.Metric)
labelsBuilder.Del("__name__")
chks[0][0].Chunk.Metric = labelsBuilder.Labels()
}
labels := chks[0][0].Chunk.Metric.String()

// __name__ is only used for upstream compatibility and is hardcoded within loki. Strip it from the return label set.
labels := dropLabels(chks[0][0].Chunk.Metric, labels.MetricName).String()

for i := range chks {
iterators := make([]iter.EntryIterator, 0, len(chks[i]))
Expand Down Expand Up @@ -400,3 +432,20 @@ outer:

return css
}

// dropLabels returns a new label set with certain labels dropped
func dropLabels(ls labels.Labels, removals ...string) (dst labels.Labels) {
toDel := make(map[string]struct{})
for _, r := range removals {
toDel[r] = struct{}{}
}

for _, l := range ls {
_, remove := toDel[l.Name]
if !remove {
dst = append(dst, l)
}
}

return dst
}
Loading

0 comments on commit c89df1a

Please sign in to comment.