diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index 344d8e9be7d2d..bd286b624fc3e 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -97,7 +97,11 @@ func (c *dumbChunk) Bytes() ([]byte, error) { return nil, nil } -func (c *dumbChunk) Blocks() int { +func (c *dumbChunk) Blocks(_ time.Time, _ time.Time) []Block { + return nil +} + +func (c *dumbChunk) BlockCount() int { return 0 } diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index e5e4069d2573c..8fbbac065ca32 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -98,11 +98,27 @@ type Chunk interface { SpaceFor(*logproto.Entry) bool Append(*logproto.Entry) error Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.LineFilter) (iter.EntryIterator, error) + // Returns the list of blocks in the chunks. + Blocks(mintT, maxtT time.Time) []Block Size() int Bytes() ([]byte, error) - Blocks() int + BlockCount() int Utilization() float64 UncompressedSize() int CompressedSize() int Close() error } + +// Block is a chunk block. +type Block interface { + // MinTime is the minimum time of entries in the block + MinTime() int64 + // MaxTime is the maximum time of entries in the block + MaxTime() int64 + // Offset is the offset/position of the block in the chunk. Offset is unique for a given block per chunk. + Offset() int + // Entries is the amount of entries in the block. + Entries() int + // Iterator returns an entry iterator for the block. + Iterator(context.Context, logql.LineFilter) iter.EntryIterator +} diff --git a/pkg/chunkenc/lazy_chunk.go b/pkg/chunkenc/lazy_chunk.go deleted file mode 100644 index 2e14c48a6f631..0000000000000 --- a/pkg/chunkenc/lazy_chunk.go +++ /dev/null @@ -1,31 +0,0 @@ -package chunkenc - -import ( - "context" - "errors" - "time" - - "github.com/cortexproject/cortex/pkg/chunk" - - "github.com/grafana/loki/pkg/iter" - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" -) - -// LazyChunk loads the chunk when it is accessed. -type LazyChunk struct { - Chunk chunk.Chunk - IsValid bool - Fetcher *chunk.Fetcher -} - -// Iterator returns an entry iterator. -func (c *LazyChunk) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.LineFilter) (iter.EntryIterator, error) { - // If the chunk is already loaded, then use that. - if c.Chunk.Data != nil { - lokiChunk := c.Chunk.Data.(*Facade).LokiChunk() - return lokiChunk.Iterator(ctx, from, through, direction, filter) - } - - return nil, errors.New("chunk is not loaded") -} diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 309dedb0b99b9..71e50080f0204 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -80,6 +80,8 @@ type block struct { offset int // The offset of the block in the chunk. uncompressedSize int // Total uncompressed size in bytes when the chunk is cut. + + readers ReaderPool } // This block holds the un-compressed entries. Once it has enough data, this is @@ -212,7 +214,9 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { bc.blocks = make([]block, 0, num) for i := 0; i < num; i++ { - blk := block{} + blk := block{ + readers: bc.readers, + } // Read #entries. blk.numEntries = db.uvarint() @@ -339,8 +343,8 @@ func (c *MemChunk) Size() int { return ne } -// Blocks implements Chunk. -func (c *MemChunk) Blocks() int { +// BlockCount implements Chunk. +func (c *MemChunk) BlockCount() int { return len(c.blocks) } @@ -431,6 +435,7 @@ func (c *MemChunk) cut() error { } c.blocks = append(c.blocks, block{ + readers: c.readers, b: b, numEntries: len(c.head.entries), mint: c.head.mint, @@ -477,7 +482,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi if maxt < b.mint || b.maxt < mint { continue } - its = append(its, b.iterator(ctx, c.readers, filter)) + its = append(its, b.Iterator(ctx, filter)) } if !c.head.isEmpty() { @@ -497,11 +502,38 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi return iter.NewEntryReversedIter(iterForward) } -func (b block) iterator(ctx context.Context, pool ReaderPool, filter logql.LineFilter) iter.EntryIterator { +// Blocks implements Chunk +func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block { + mint, maxt := mintT.UnixNano(), maxtT.UnixNano() + blocks := make([]Block, 0, len(c.blocks)) + + for _, b := range c.blocks { + if maxt > b.mint && b.maxt > mint { + blocks = append(blocks, b) + } + } + return blocks +} + +func (b block) Iterator(ctx context.Context, filter logql.LineFilter) iter.EntryIterator { if len(b.b) == 0 { return emptyIterator } - return newBufferedIterator(ctx, pool, b.b, filter) + return newBufferedIterator(ctx, b.readers, b.b, filter) +} + +func (b block) Offset() int { + return b.offset +} + +func (b block) Entries() int { + return b.numEntries +} +func (b block) MinTime() int64 { + return b.mint +} +func (b block) MaxTime() int64 { + return b.maxt } func (hb *headBlock) iterator(ctx context.Context, mint, maxt int64, filter logql.LineFilter) iter.EntryIterator { diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 1f307dae8a70f..296cb88cdf256 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -152,7 +152,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize chunk.closed = true samplesPerChunk.Observe(float64(chunk.chunk.Size())) - blocksPerChunk.Observe(float64(chunk.chunk.Blocks())) + blocksPerChunk.Observe(float64(chunk.chunk.BlockCount())) chunksCreatedTotal.Inc() s.chunks = append(s.chunks, chunkDesc{ diff --git a/pkg/storage/iterator.go b/pkg/storage/batch.go similarity index 57% rename from pkg/storage/iterator.go rename to pkg/storage/batch.go index b3ce7e24592ef..327bee5fcbaa3 100644 --- a/pkg/storage/iterator.go +++ b/pkg/storage/batch.go @@ -22,48 +22,6 @@ import ( "github.com/grafana/loki/pkg/logql/stats" ) -// lazyChunks is a slice of lazy chunks that can ordered by chunk boundaries -// in ascending or descending depending on the direction -type lazyChunks struct { - chunks []*chunkenc.LazyChunk - direction logproto.Direction -} - -func (l lazyChunks) Len() int { return len(l.chunks) } -func (l lazyChunks) Swap(i, j int) { l.chunks[i], l.chunks[j] = l.chunks[j], l.chunks[i] } -func (l lazyChunks) Peek() *chunkenc.LazyChunk { return l.chunks[0] } -func (l lazyChunks) Less(i, j int) bool { - if l.direction == logproto.FORWARD { - t1, t2 := l.chunks[i].Chunk.From, l.chunks[j].Chunk.From - if !t1.Equal(t2) { - return t1.Before(t2) - } - return l.chunks[i].Chunk.Fingerprint < l.chunks[j].Chunk.Fingerprint - } - t1, t2 := l.chunks[i].Chunk.Through, l.chunks[j].Chunk.Through - if !t1.Equal(t2) { - return t1.After(t2) - } - return l.chunks[i].Chunk.Fingerprint > l.chunks[j].Chunk.Fingerprint -} - -// pop returns the top `count` lazychunks, the original slice is splitted an copied -// to avoid retaining chunks in the slice backing array. -func (l *lazyChunks) pop(count int) []*chunkenc.LazyChunk { - if len(l.chunks) <= count { - old := l.chunks - l.chunks = nil - return old - } - // split slices into two new ones and copy parts to each so we don't keep old reference - res := make([]*chunkenc.LazyChunk, count) - copy(res, l.chunks[0:count]) - new := make([]*chunkenc.LazyChunk, len(l.chunks)-count) - copy(new, l.chunks[count:len(l.chunks)]) - l.chunks = new - return res -} - // batchChunkIterator is an EntryIterator that iterates through chunks by batch of `batchSize`. // Since chunks can overlap across batches for each iteration the iterator will keep all overlapping // chunks with the next chunk from the next batch and added it to the next iteration. In this case the boundaries of the batch @@ -73,7 +31,8 @@ type batchChunkIterator struct { batchSize int err error curr iter.EntryIterator - lastOverlapping []*chunkenc.LazyChunk + lastOverlapping []*LazyChunk + labels map[model.Fingerprint]string ctx context.Context cancel context.CancelFunc @@ -87,7 +46,7 @@ type batchChunkIterator struct { } // newBatchChunkIterator creates a new batch iterator with the given batchSize. -func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, batchSize int, matchers []*labels.Matcher, filter logql.LineFilter, req *logproto.QueryRequest) *batchChunkIterator { +func newBatchChunkIterator(ctx context.Context, chunks []*LazyChunk, batchSize int, matchers []*labels.Matcher, filter logql.LineFilter, req *logproto.QueryRequest) *batchChunkIterator { // __name__ is not something we filter by because it's a constant in loki // and only used for upstream compatibility; therefore remove it. // The same applies to the sharding label which is injected by the cortex storage code. @@ -109,6 +68,7 @@ func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, ba ctx: ctx, cancel: cancel, chunks: lazyChunks{direction: req.Direction, chunks: chunks}, + labels: map[model.Fingerprint]string{}, next: make(chan *struct { iter iter.EntryIterator err error @@ -144,6 +104,7 @@ func newBatchChunkIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, ba }() return res } + func (it *batchChunkIterator) Next() bool { var err error // for loop to avoid recursion @@ -170,93 +131,100 @@ func (it *batchChunkIterator) Next() bool { func (it *batchChunkIterator) nextBatch() (iter.EntryIterator, error) { // the first chunk of the batch headChunk := it.chunks.Peek() + from, through := it.req.Start, it.req.End + batch := make([]*LazyChunk, 0, it.batchSize+len(it.lastOverlapping)) + var nextChunk *LazyChunk - // pop the next batch of chunks and append/prepend previous overlapping chunks - // so we can merge/de-dupe overlapping entries. - batch := make([]*chunkenc.LazyChunk, 0, it.batchSize+len(it.lastOverlapping)) - if it.req.Direction == logproto.FORWARD { - batch = append(batch, it.lastOverlapping...) - } - batch = append(batch, it.chunks.pop(it.batchSize)...) - if it.req.Direction == logproto.BACKWARD { - batch = append(batch, it.lastOverlapping...) - } + for it.chunks.Len() > 0 { - from, through := it.req.Start, it.req.End - if it.chunks.Len() > 0 { - nextChunk := it.chunks.Peek() - // we max out our iterator boundaries to the next chunks in the queue - // so that overlapping chunks are together + // pop the next batch of chunks and append/prepend previous overlapping chunks + // so we can merge/de-dupe overlapping entries. + if it.req.Direction == logproto.FORWARD { + batch = append(batch, it.lastOverlapping...) + } + batch = append(batch, it.chunks.pop(it.batchSize)...) if it.req.Direction == logproto.BACKWARD { - from = time.Unix(0, nextChunk.Chunk.Through.UnixNano()) - - // we have to reverse the inclusivity of the chunk iterator from - // [from, through) to (from, through] for backward queries, except when - // the batch's `from` is equal to the query's Start. This can be achieved - // by shifting `from` by one nanosecond. - if !from.Equal(it.req.Start) { - from = from.Add(time.Nanosecond) - } - } else { - through = time.Unix(0, nextChunk.Chunk.From.UnixNano()) + batch = append(batch, it.lastOverlapping...) } - // we save all overlapping chunks as they are also needed in the next batch to properly order entries. - // If we have chunks like below: - // ┌──────────────┐ - // │ # 47 │ - // └──────────────┘ - // ┌──────────────────────────┐ - // │ # 48 | - // └──────────────────────────┘ - // ┌──────────────┐ - // │ # 49 │ - // └──────────────┘ - // ┌────────────────────┐ - // │ # 50 │ - // └────────────────────┘ - // - // And nextChunk is # 49, we need to keep references to #47 and #48 as they won't be - // iterated over completely (we're clipping through to #49's from) and then add them to the next batch. - it.lastOverlapping = it.lastOverlapping[:0] - for _, c := range batch { + + if it.chunks.Len() > 0 { + nextChunk = it.chunks.Peek() + // we max out our iterator boundaries to the next chunks in the queue + // so that overlapping chunks are together if it.req.Direction == logproto.BACKWARD { - if c.Chunk.From.Before(nextChunk.Chunk.Through) || c.Chunk.From == nextChunk.Chunk.Through { - it.lastOverlapping = append(it.lastOverlapping, c) + from = time.Unix(0, nextChunk.Chunk.Through.UnixNano()) + + // we have to reverse the inclusivity of the chunk iterator from + // [from, through) to (from, through] for backward queries, except when + // the batch's `from` is equal to the query's Start. This can be achieved + // by shifting `from` by one nanosecond. + if !from.Equal(it.req.Start) { + from = from.Add(time.Nanosecond) } } else { - if !c.Chunk.Through.Before(nextChunk.Chunk.From) { - it.lastOverlapping = append(it.lastOverlapping, c) - } + through = time.Unix(0, nextChunk.Chunk.From.UnixNano()) } + // we save all overlapping chunks as they are also needed in the next batch to properly order entries. + // If we have chunks like below: + // ┌──────────────┐ + // │ # 47 │ + // └──────────────┘ + // ┌──────────────────────────┐ + // │ # 48 | + // └──────────────────────────┘ + // ┌──────────────┐ + // │ # 49 │ + // └──────────────┘ + // ┌────────────────────┐ + // │ # 50 │ + // └────────────────────┘ + // + // And nextChunk is # 49, we need to keep references to #47 and #48 as they won't be + // iterated over completely (we're clipping through to #49's from) and then add them to the next batch. + } - } - if it.req.Direction == logproto.BACKWARD { - through = time.Unix(0, headChunk.Chunk.Through.UnixNano()) + if it.req.Direction == logproto.BACKWARD { + through = time.Unix(0, headChunk.Chunk.Through.UnixNano()) + + if through.After(it.req.End) { + through = it.req.End + } - if through.After(it.req.End) { - through = it.req.End - } + // we have to reverse the inclusivity of the chunk iterator from + // [from, through) to (from, through] for backward queries, except when + // the batch's `through` is equal to the query's End. This can be achieved + // by shifting `through` by one nanosecond. + if !through.Equal(it.req.End) { + through = through.Add(time.Nanosecond) + } + } else { + from = time.Unix(0, headChunk.Chunk.From.UnixNano()) - // we have to reverse the inclusivity of the chunk iterator from - // [from, through) to (from, through] for backward queries, except when - // the batch's `through` is equal to the query's End. This can be achieved - // by shifting `through` by one nanosecond. - if !through.Equal(it.req.End) { - through = through.Add(time.Nanosecond) + // when clipping the from it should never be before the start or equal to the end. + // Doing so would include entries not requested. + if from.Before(it.req.Start) || from.Equal(it.req.End) { + from = it.req.Start + } } - } else { - from = time.Unix(0, headChunk.Chunk.From.UnixNano()) - // when clipping the from it should never be before the start or equal to the end. - // Doing so would include entries not requested. - if from.Before(it.req.Start) || from.Equal(it.req.End) { - from = it.req.Start + // it's possible that the current batch and the next batch are fully overlapping in which case + // we should keep adding more items until the batch boundaries difference is positive. + if through.Sub(from) > 0 { + break } } + if it.chunks.Len() > 0 { + it.lastOverlapping = it.lastOverlapping[:0] + for _, c := range batch { + if c.IsOverlapping(nextChunk, it.req.Direction) { + it.lastOverlapping = append(it.lastOverlapping, c) + } + } + } // create the new chunks iterator from the current batch. - return newChunksIterator(it.ctx, batch, it.matchers, it.filter, it.req.Direction, from, through) + return it.newChunksIterator(batch, from, through, nextChunk) } func (it *batchChunkIterator) Entry() logproto.Entry { @@ -286,20 +254,20 @@ func (it *batchChunkIterator) Close() error { } // newChunksIterator creates an iterator over a set of lazychunks. -func newChunksIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, matchers []*labels.Matcher, filter logql.LineFilter, direction logproto.Direction, from, through time.Time) (iter.EntryIterator, error) { +func (it *batchChunkIterator) newChunksIterator(chunks []*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (iter.EntryIterator, error) { chksBySeries := partitionBySeriesChunks(chunks) // Make sure the initial chunks are loaded. This is not one chunk // per series, but rather a chunk per non-overlapping iterator. - if err := loadFirstChunks(ctx, chksBySeries); err != nil { + if err := loadFirstChunks(it.ctx, chksBySeries); err != nil { return nil, err } // Now that we have the first chunk for each series loaded, // we can proceed to filter the series that don't match. - chksBySeries = filterSeriesByMatchers(chksBySeries, matchers) + chksBySeries = filterSeriesByMatchers(chksBySeries, it.matchers) - var allChunks []*chunkenc.LazyChunk + var allChunks []*LazyChunk for _, series := range chksBySeries { for _, chunks := range series { allChunks = append(allChunks, chunks...) @@ -307,22 +275,22 @@ func newChunksIterator(ctx context.Context, chunks []*chunkenc.LazyChunk, matche } // Finally we load all chunks not already loaded - if err := fetchLazyChunks(ctx, allChunks); err != nil { + if err := fetchLazyChunks(it.ctx, allChunks); err != nil { return nil, err } - iters, err := buildIterators(ctx, chksBySeries, filter, direction, from, through) + iters, err := it.buildIterators(chksBySeries, from, through, nextChunk) if err != nil { return nil, err } - return iter.NewHeapIterator(ctx, iters, direction), nil + return iter.NewHeapIterator(it.ctx, iters, it.req.Direction), nil } -func buildIterators(ctx context.Context, chks map[model.Fingerprint][][]*chunkenc.LazyChunk, filter logql.LineFilter, direction logproto.Direction, from, through time.Time) ([]iter.EntryIterator, error) { +func (it *batchChunkIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) for _, chunks := range chks { - iterator, err := buildHeapIterator(ctx, chunks, filter, direction, from, through) + iterator, err := it.buildHeapIterator(chunks, from, through, nextChunk) if err != nil { return nil, err } @@ -332,24 +300,34 @@ func buildIterators(ctx context.Context, chks map[model.Fingerprint][][]*chunken return result, nil } -func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter logql.LineFilter, direction logproto.Direction, from, through time.Time) (iter.EntryIterator, error) { +// computeLabels compute the labels string representation, uses a map to cache result per fingerprint. +func (it *batchChunkIterator) computeLabels(c *LazyChunk) string { + if lbs, ok := it.labels[c.Chunk.Fingerprint]; ok { + return lbs + } + lbs := dropLabels(c.Chunk.Metric, labels.MetricName).String() + it.labels[c.Chunk.Fingerprint] = lbs + return lbs +} + +func (it *batchChunkIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) // __name__ is only used for upstream compatibility and is hardcoded within loki. Strip it from the return label set. - labels := dropLabels(chks[0][0].Chunk.Metric, labels.MetricName).String() + labels := it.computeLabels(chks[0][0]) for i := range chks { iterators := make([]iter.EntryIterator, 0, len(chks[i])) for j := range chks[i] { if !chks[i][j].IsValid { continue } - iterator, err := chks[i][j].Iterator(ctx, from, through, direction, filter) + iterator, err := chks[i][j].Iterator(it.ctx, from, through, it.req.Direction, it.filter, nextChunk) if err != nil { return nil, err } iterators = append(iterators, iterator) } - if direction == logproto.BACKWARD { + if it.req.Direction == logproto.BACKWARD { for i, j := 0, len(iterators)-1; i < j; i, j = i+1, j-1 { iterators[i], iterators[j] = iterators[j], iterators[i] } @@ -357,10 +335,10 @@ func buildHeapIterator(ctx context.Context, chks [][]*chunkenc.LazyChunk, filter result = append(result, iter.NewNonOverlappingIterator(iterators, labels)) } - return iter.NewHeapIterator(ctx, result, direction), nil + return iter.NewHeapIterator(it.ctx, result, it.req.Direction), nil } -func filterSeriesByMatchers(chks map[model.Fingerprint][][]*chunkenc.LazyChunk, matchers []*labels.Matcher) map[model.Fingerprint][][]*chunkenc.LazyChunk { +func filterSeriesByMatchers(chks map[model.Fingerprint][][]*LazyChunk, matchers []*labels.Matcher) map[model.Fingerprint][][]*LazyChunk { outer: for fp, chunks := range chks { for _, matcher := range matchers { @@ -373,7 +351,7 @@ outer: return chks } -func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error { +func fetchLazyChunks(ctx context.Context, chunks []*LazyChunk) error { log, ctx := spanlogger.New(ctx, "LokiStore.fetchLazyChunks") defer log.Finish() start := time.Now() @@ -384,7 +362,7 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error { storeStats.TotalChunksDownloaded += totalChunks }() - chksByFetcher := map[*chunk.Fetcher][]*chunkenc.LazyChunk{} + chksByFetcher := map[*chunk.Fetcher][]*LazyChunk{} for _, c := range chunks { if c.Chunk.Data == nil { chksByFetcher[c.Fetcher] = append(chksByFetcher[c.Fetcher], c) @@ -398,10 +376,10 @@ func fetchLazyChunks(ctx context.Context, chunks []*chunkenc.LazyChunk) error { errChan := make(chan error) for fetcher, chunks := range chksByFetcher { - go func(fetcher *chunk.Fetcher, chunks []*chunkenc.LazyChunk) { + go func(fetcher *chunk.Fetcher, chunks []*LazyChunk) { keys := make([]string, 0, len(chunks)) chks := make([]chunk.Chunk, 0, len(chunks)) - index := make(map[string]*chunkenc.LazyChunk, len(chunks)) + index := make(map[string]*LazyChunk, len(chunks)) // FetchChunks requires chunks to be ordered by external key. sort.Slice(chunks, func(i, j int) bool { return chunks[i].Chunk.ExternalKey() < chunks[j].Chunk.ExternalKey() }) @@ -458,8 +436,8 @@ func isInvalidChunkError(err error) bool { return false } -func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*chunkenc.LazyChunk) error { - var toLoad []*chunkenc.LazyChunk +func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*LazyChunk) error { + var toLoad []*LazyChunk for _, lchks := range chks { for _, lchk := range lchks { if len(lchk) == 0 { @@ -471,13 +449,13 @@ func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]*chunke return fetchLazyChunks(ctx, toLoad) } -func partitionBySeriesChunks(chunks []*chunkenc.LazyChunk) map[model.Fingerprint][][]*chunkenc.LazyChunk { - chunksByFp := map[model.Fingerprint][]*chunkenc.LazyChunk{} +func partitionBySeriesChunks(chunks []*LazyChunk) map[model.Fingerprint][][]*LazyChunk { + chunksByFp := map[model.Fingerprint][]*LazyChunk{} for _, c := range chunks { fp := c.Chunk.Fingerprint chunksByFp[fp] = append(chunksByFp[fp], c) } - result := make(map[model.Fingerprint][][]*chunkenc.LazyChunk, len(chunksByFp)) + result := make(map[model.Fingerprint][][]*LazyChunk, len(chunksByFp)) for fp, chks := range chunksByFp { result[fp] = partitionOverlappingChunks(chks) @@ -488,12 +466,12 @@ func partitionBySeriesChunks(chunks []*chunkenc.LazyChunk) map[model.Fingerprint // partitionOverlappingChunks splits the list of chunks into different non-overlapping lists. // todo this might reverse the order. -func partitionOverlappingChunks(chunks []*chunkenc.LazyChunk) [][]*chunkenc.LazyChunk { +func partitionOverlappingChunks(chunks []*LazyChunk) [][]*LazyChunk { sort.Slice(chunks, func(i, j int) bool { return chunks[i].Chunk.From < chunks[j].Chunk.From }) - css := [][]*chunkenc.LazyChunk{} + css := [][]*LazyChunk{} outer: for _, c := range chunks { for i, cs := range css { @@ -504,7 +482,7 @@ outer: } } // If the chunk overlaps with every existing list, then create a new list. - cs := make([]*chunkenc.LazyChunk, 0, len(chunks)/(len(css)+1)) + cs := make([]*LazyChunk, 0, len(chunks)/(len(css)+1)) cs = append(cs, c) css = append(css, cs) } diff --git a/pkg/storage/iterator_test.go b/pkg/storage/batch_test.go similarity index 88% rename from pkg/storage/iterator_test.go rename to pkg/storage/batch_test.go index fd599558c1722..dc1c008a4c277 100644 --- a/pkg/storage/iterator_test.go +++ b/pkg/storage/batch_test.go @@ -8,6 +8,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/pkg/errors" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/require" @@ -16,12 +17,14 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/stats" ) func Test_newBatchChunkIterator(t *testing.T) { tests := map[string]struct { - chunks []*chunkenc.LazyChunk + chunks []*LazyChunk expected []logproto.Stream matchers string start, end time.Time @@ -29,7 +32,7 @@ func Test_newBatchChunkIterator(t *testing.T) { batchSize int }{ "forward with overlap": { - []*chunkenc.LazyChunk{ + []*LazyChunk{ newLazyChunk(logproto.Stream{ Labels: fooLabelsWithName, Entries: []logproto.Entry{ @@ -138,7 +141,7 @@ func Test_newBatchChunkIterator(t *testing.T) { 2, }, "forward with overlapping non-continuous entries": { - []*chunkenc.LazyChunk{ + []*LazyChunk{ newLazyChunk(logproto.Stream{ Labels: fooLabelsWithName, Entries: []logproto.Entry{ @@ -221,7 +224,7 @@ func Test_newBatchChunkIterator(t *testing.T) { 2, }, "backward with overlap": { - []*chunkenc.LazyChunk{ + []*LazyChunk{ newLazyChunk(logproto.Stream{ Labels: fooLabelsWithName, Entries: []logproto.Entry{ @@ -330,7 +333,7 @@ func Test_newBatchChunkIterator(t *testing.T) { 2, }, "backward with overlapping non-continuous entries": { - []*chunkenc.LazyChunk{ + []*LazyChunk{ newLazyChunk(logproto.Stream{ Labels: fooLabelsWithName, Entries: []logproto.Entry{ @@ -429,7 +432,7 @@ func Test_newBatchChunkIterator(t *testing.T) { 2, }, "forward without overlap": { - []*chunkenc.LazyChunk{ + []*LazyChunk{ newLazyChunk(logproto.Stream{ Labels: fooLabelsWithName, Entries: []logproto.Entry{ @@ -487,7 +490,7 @@ func Test_newBatchChunkIterator(t *testing.T) { 2, }, "backward without overlap": { - []*chunkenc.LazyChunk{ + []*LazyChunk{ newLazyChunk(logproto.Stream{ Labels: fooLabelsWithName, Entries: []logproto.Entry{ @@ -598,39 +601,39 @@ func TestPartitionOverlappingchunks(t *testing.T) { ) for i, tc := range []struct { - input []*chunkenc.LazyChunk - expected [][]*chunkenc.LazyChunk + input []*LazyChunk + expected [][]*LazyChunk }{ { - input: []*chunkenc.LazyChunk{ + input: []*LazyChunk{ oneThroughFour, two, three, }, - expected: [][]*chunkenc.LazyChunk{ + expected: [][]*LazyChunk{ {oneThroughFour}, {two, three}, }, }, { - input: []*chunkenc.LazyChunk{ + input: []*LazyChunk{ two, oneThroughFour, three, }, - expected: [][]*chunkenc.LazyChunk{ + expected: [][]*LazyChunk{ {oneThroughFour}, {two, three}, }, }, { - input: []*chunkenc.LazyChunk{ + input: []*LazyChunk{ two, two, three, three, }, - expected: [][]*chunkenc.LazyChunk{ + expected: [][]*LazyChunk{ {two, three}, {two, three}, }, @@ -687,11 +690,11 @@ func TestBuildHeapIterator(t *testing.T) { ) for i, tc := range []struct { - input [][]*chunkenc.LazyChunk + input [][]*LazyChunk expected []logproto.Stream }{ { - [][]*chunkenc.LazyChunk{ + [][]*LazyChunk{ {firstChunk}, {thirdChunk}, }, @@ -720,7 +723,7 @@ func TestBuildHeapIterator(t *testing.T) { }, }, { - [][]*chunkenc.LazyChunk{ + [][]*LazyChunk{ {secondChunk}, {firstChunk, thirdChunk}, }, @@ -751,7 +754,12 @@ func TestBuildHeapIterator(t *testing.T) { } { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { ctx = user.InjectOrgID(context.Background(), "test-user") - it, err := buildHeapIterator(ctx, tc.input, nil, logproto.FORWARD, from, from.Add(6*time.Millisecond)) + b := &batchChunkIterator{ + ctx: ctx, + req: &logproto.QueryRequest{Direction: logproto.FORWARD}, + labels: map[model.Fingerprint]string{}, + } + it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), nil) if err != nil { t.Errorf("buildHeapIterator error = %v", err) return @@ -841,3 +849,57 @@ func Test_IsInvalidChunkError(t *testing.T) { require.Equal(t, tc.expectedResult, result) } } + +var entry logproto.Entry + +func Benchmark_store_OverlappingChunks(b *testing.B) { + b.ReportAllocs() + st := &store{ + cfg: Config{ + MaxChunkBatchSize: 50, + }, + Store: newMockChunkStore(newOverlappingStreams(200, 200)), + } + b.ResetTimer() + ctx := user.InjectOrgID(stats.NewContext(context.Background()), "fake") + start := time.Now() + for i := 0; i < b.N; i++ { + it, err := st.LazyQuery(ctx, logql.SelectParams{QueryRequest: &logproto.QueryRequest{ + Selector: `{foo="bar"}`, + Direction: logproto.BACKWARD, + Limit: 0, + Shards: nil, + Start: time.Unix(0, 1), + End: time.Unix(0, time.Now().UnixNano()), + }}) + if err != nil { + b.Fatal(err) + } + for it.Next() { + entry = it.Entry() + } + if err := it.Close(); err != nil { + b.Fatal(err) + } + } + r := stats.Snapshot(ctx, time.Since(start)) + b.Log("Total chunks:" + fmt.Sprintf("%d", r.Store.TotalChunksRef)) + b.Log("Total bytes decompressed:" + fmt.Sprintf("%d", r.Store.DecompressedBytes)) +} + +func newOverlappingStreams(streamCount int, entryCount int) []*logproto.Stream { + streams := make([]*logproto.Stream, streamCount) + for i := range streams { + streams[i] = &logproto.Stream{ + Labels: fmt.Sprintf(`{foo="bar",id="%d"}`, i), + Entries: make([]logproto.Entry, entryCount), + } + for j := range streams[i].Entries { + streams[i].Entries[j] = logproto.Entry{ + Timestamp: time.Unix(0, int64(1+j)), + Line: "a very compressible log line duh", + } + } + } + return streams +} diff --git a/pkg/storage/cache.go b/pkg/storage/cache.go new file mode 100644 index 0000000000000..db69e8d11f00a --- /dev/null +++ b/pkg/storage/cache.go @@ -0,0 +1,94 @@ +package storage + +import ( + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" +) + +// cachedIterator is an iterator that caches iteration to be replayed later on. +type cachedIterator struct { + cache []*logproto.Entry + base iter.EntryIterator + + labels string + curr int + + closeErr error + iterErr error +} + +// newCachedIterator creates an iterator that cache iteration result and can be iterated again +// after closing it without re-using the underlaying iterator `it`. +// The cache iterator should be used for entries that belongs to the same stream only. +func newCachedIterator(it iter.EntryIterator, cap int) *cachedIterator { + c := &cachedIterator{ + base: it, + cache: make([]*logproto.Entry, 0, cap), + curr: -1, + } + c.load() + return c +} + +func (it *cachedIterator) reset() { + it.curr = -1 +} + +func (it *cachedIterator) load() { + if it.base != nil { + defer func() { + it.closeErr = it.base.Close() + it.iterErr = it.base.Error() + it.base = nil + it.reset() + }() + // set labels using the first entry + if !it.base.Next() { + return + } + it.labels = it.base.Labels() + + // add all entries until the base iterator is exhausted + for { + e := it.base.Entry() + it.cache = append(it.cache, &e) + if !it.base.Next() { + break + } + } + + } +} + +func (it *cachedIterator) Next() bool { + if len(it.cache) == 0 { + it.cache = nil + return false + } + if it.curr+1 >= len(it.cache) { + return false + } + it.curr++ + return it.curr < len(it.cache) +} + +func (it *cachedIterator) Entry() logproto.Entry { + if len(it.cache) == 0 { + return logproto.Entry{} + } + if it.curr < 0 { + return *it.cache[0] + } + return *it.cache[it.curr] +} + +func (it *cachedIterator) Labels() string { + return it.labels +} + +func (it *cachedIterator) Error() error { return it.iterErr } + +func (it *cachedIterator) Close() error { + it.reset() + return it.closeErr +} diff --git a/pkg/storage/cache_test.go b/pkg/storage/cache_test.go new file mode 100644 index 0000000000000..cdc7f7d473263 --- /dev/null +++ b/pkg/storage/cache_test.go @@ -0,0 +1,86 @@ +package storage + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" +) + +func Test_CachedIterator(t *testing.T) { + stream := logproto.Stream{ + Labels: `{foo="bar"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1), Line: "1"}, + {Timestamp: time.Unix(0, 2), Line: "2"}, + {Timestamp: time.Unix(0, 3), Line: "3"}, + }, + } + c := newCachedIterator(iter.NewStreamIterator(stream), 3) + + assert := func() { + // we should crash for call of entry without next although that's not expected. + require.Equal(t, stream.Labels, c.Labels()) + require.Equal(t, stream.Entries[0], c.Entry()) + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[0], c.Entry()) + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[1], c.Entry()) + require.Equal(t, true, c.Next()) + require.Equal(t, stream.Entries[2], c.Entry()) + require.Equal(t, false, c.Next()) + require.Equal(t, nil, c.Error()) + require.Equal(t, stream.Entries[2], c.Entry()) + require.Equal(t, false, c.Next()) + } + + assert() + + // Close the iterator reset it to the beginning. + require.Equal(t, nil, c.Close()) + + assert() +} + +func Test_EmptyCachedIterator(t *testing.T) { + + c := newCachedIterator(iter.NoopIterator, 0) + + require.Equal(t, "", c.Labels()) + require.Equal(t, logproto.Entry{}, c.Entry()) + require.Equal(t, false, c.Next()) + require.Equal(t, "", c.Labels()) + require.Equal(t, logproto.Entry{}, c.Entry()) + + require.Equal(t, nil, c.Close()) + + require.Equal(t, "", c.Labels()) + require.Equal(t, logproto.Entry{}, c.Entry()) + require.Equal(t, false, c.Next()) + require.Equal(t, "", c.Labels()) + require.Equal(t, logproto.Entry{}, c.Entry()) + +} + +func Test_ErrorCachedIterator(t *testing.T) { + + c := newCachedIterator(&errorIter{}, 0) + + require.Equal(t, false, c.Next()) + require.Equal(t, "", c.Labels()) + require.Equal(t, logproto.Entry{}, c.Entry()) + require.Equal(t, errors.New("error"), c.Error()) + require.Equal(t, errors.New("close"), c.Close()) +} + +type errorIter struct{} + +func (errorIter) Next() bool { return false } +func (errorIter) Error() error { return errors.New("error") } +func (errorIter) Labels() string { return "" } +func (errorIter) Entry() logproto.Entry { return logproto.Entry{} } +func (errorIter) Close() error { return errors.New("close") } diff --git a/pkg/storage/lazy_chunk.go b/pkg/storage/lazy_chunk.go new file mode 100644 index 0000000000000..366eefe2f5185 --- /dev/null +++ b/pkg/storage/lazy_chunk.go @@ -0,0 +1,157 @@ +package storage + +import ( + "context" + "errors" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" +) + +// LazyChunk loads the chunk when it is accessed. +type LazyChunk struct { + Chunk chunk.Chunk + IsValid bool + Fetcher *chunk.Fetcher + + // cache of overlapping block. + // We use the offset of the block as key since it's unique per chunk. + overlappingBlocks map[int]*cachedIterator +} + +// Iterator returns an entry iterator. +// The iterator returned will cache overlapping block's entries with the next chunk if passed. +// This way when we re-use them for ordering across batches we don't re-decompress the data again. +func (c *LazyChunk) Iterator( + ctx context.Context, + from, through time.Time, + direction logproto.Direction, + filter logql.LineFilter, + nextChunk *LazyChunk, +) (iter.EntryIterator, error) { + + // If the chunk is not already loaded, then error out. + if c.Chunk.Data == nil { + return nil, errors.New("chunk is not loaded") + } + + lokiChunk := c.Chunk.Data.(*chunkenc.Facade).LokiChunk() + blocks := lokiChunk.Blocks(from, through) + if len(blocks) == 0 { + return iter.NoopIterator, nil + } + its := make([]iter.EntryIterator, 0, len(blocks)) + + for _, b := range blocks { + // if we have already processed and cache block let's use it. + if cache, ok := c.overlappingBlocks[b.Offset()]; ok { + clone := *cache + clone.reset() + its = append(its, &clone) + continue + } + // if the block is overlapping cache it with the next chunk boundaries. + if nextChunk != nil && IsBlockOverlapping(b, nextChunk, direction) { + it := newCachedIterator(b.Iterator(ctx, filter), b.Entries()) + its = append(its, it) + if c.overlappingBlocks == nil { + c.overlappingBlocks = make(map[int]*cachedIterator) + } + c.overlappingBlocks[b.Offset()] = it + continue + } + if nextChunk != nil { + delete(c.overlappingBlocks, b.Offset()) + } + // non-overlapping block with the next chunk are not cached. + its = append(its, b.Iterator(ctx, filter)) + } + + // build the final iterator bound to the requested time range. + iterForward := iter.NewTimeRangedIterator( + iter.NewNonOverlappingIterator(its, ""), + from, + through, + ) + + if direction == logproto.FORWARD { + return iterForward, nil + } + + return iter.NewEntryReversedIter(iterForward) +} + +func IsBlockOverlapping(b chunkenc.Block, with *LazyChunk, direction logproto.Direction) bool { + if direction == logproto.BACKWARD { + through := int64(with.Chunk.Through) * int64(time.Millisecond) + if b.MinTime() <= through { + return true + } + } else { + from := int64(with.Chunk.From) * int64(time.Millisecond) + if b.MaxTime() >= from { + return true + } + } + return false +} + +func (c *LazyChunk) IsOverlapping(with *LazyChunk, direction logproto.Direction) bool { + if direction == logproto.BACKWARD { + if c.Chunk.From.Before(with.Chunk.Through) || c.Chunk.From == with.Chunk.Through { + return true + } + } else { + if !c.Chunk.Through.Before(with.Chunk.From) { + return true + } + } + return false +} + +// lazyChunks is a slice of lazy chunks that can ordered by chunk boundaries +// in ascending or descending depending on the direction +type lazyChunks struct { + chunks []*LazyChunk + direction logproto.Direction +} + +func (l lazyChunks) Len() int { return len(l.chunks) } +func (l lazyChunks) Swap(i, j int) { l.chunks[i], l.chunks[j] = l.chunks[j], l.chunks[i] } +func (l lazyChunks) Peek() *LazyChunk { return l.chunks[0] } +func (l lazyChunks) Less(i, j int) bool { + if l.direction == logproto.FORWARD { + t1, t2 := l.chunks[i].Chunk.From, l.chunks[j].Chunk.From + if !t1.Equal(t2) { + return t1.Before(t2) + } + return l.chunks[i].Chunk.Fingerprint < l.chunks[j].Chunk.Fingerprint + } + t1, t2 := l.chunks[i].Chunk.Through, l.chunks[j].Chunk.Through + if !t1.Equal(t2) { + return t1.After(t2) + } + return l.chunks[i].Chunk.Fingerprint > l.chunks[j].Chunk.Fingerprint +} + +// pop returns the top `count` lazychunks, the original slice is splitted an copied +// to avoid retaining chunks in the slice backing array. +func (l *lazyChunks) pop(count int) []*LazyChunk { + if len(l.chunks) <= count { + old := l.chunks + l.chunks = nil + return old + } + // split slices into two new ones and copy parts to each so we don't keep old reference + res := make([]*LazyChunk, count) + copy(res, l.chunks[0:count]) + new := make([]*LazyChunk, len(l.chunks)-count) + copy(new, l.chunks[count:len(l.chunks)]) + l.chunks = new + return res +} diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go new file mode 100644 index 0000000000000..7fefd874a2e20 --- /dev/null +++ b/pkg/storage/lazy_chunk_test.go @@ -0,0 +1,115 @@ +package storage + +import ( + "context" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/iter" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/util" +) + +func TestIsOverlapping(t *testing.T) { + tests := []struct { + name string + direction logproto.Direction + with *LazyChunk + b chunkenc.Block + want bool + }{ + { + "equal forward", + logproto.FORWARD, + lazyChunkWithBounds(time.Unix(0, 0), time.Unix(0, int64(time.Millisecond*5))), + blockWithBounds(0, int64(time.Millisecond*5)), + true, + }, + { + "equal backward", + logproto.BACKWARD, + lazyChunkWithBounds(time.Unix(0, 0), time.Unix(0, int64(time.Millisecond*5))), + blockWithBounds(0, int64(time.Millisecond*5)), + true, + }, + { + "equal through backward", + logproto.BACKWARD, + lazyChunkWithBounds(time.Unix(0, int64(time.Millisecond*5)), time.Unix(0, int64(time.Millisecond*10))), + blockWithBounds(0, int64(time.Millisecond*10)), + true, + }, + { + "< through backward", + logproto.BACKWARD, + lazyChunkWithBounds(time.Unix(0, int64(time.Millisecond*5)), time.Unix(0, int64(time.Millisecond*10))), + blockWithBounds(0, int64(time.Millisecond*5)), + true, + }, + { + "from > forward", + logproto.FORWARD, + lazyChunkWithBounds(time.Unix(0, int64(time.Millisecond*4)), time.Unix(0, int64(time.Millisecond*10))), + blockWithBounds(int64(time.Millisecond*3), int64(time.Millisecond*5)), + true, + }, + { + "from < forward", + logproto.FORWARD, + lazyChunkWithBounds(time.Unix(0, int64(time.Millisecond*5)), time.Unix(0, int64(time.Millisecond*10))), + blockWithBounds(int64(time.Millisecond*3), int64(time.Millisecond*4)), + false, + }, + { + "from = forward", + logproto.FORWARD, + lazyChunkWithBounds(time.Unix(0, int64(time.Millisecond*5)), time.Unix(0, int64(time.Millisecond*10))), + blockWithBounds(int64(time.Millisecond*3), int64(time.Millisecond*5)), + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // testing the block one + require.Equal(t, tt.want, IsBlockOverlapping(tt.b, tt.with, tt.direction)) + // testing the chunk one + l := lazyChunkWithBounds(time.Unix(0, tt.b.MinTime()), time.Unix(0, tt.b.MaxTime())) + require.Equal(t, tt.want, l.IsOverlapping(tt.with, tt.direction)) + }) + } +} + +func lazyChunkWithBounds(from, through time.Time) *LazyChunk { + // In loki chunks are rounded when flushed fro nanoseconds to milliseconds. + fromM, throughM := util.RoundToMilliseconds(from, through) + return &LazyChunk{ + Chunk: chunk.Chunk{ + From: fromM, + Through: throughM, + }, + } +} + +type fakeBlock struct { + mint, maxt int64 +} + +func (fakeBlock) Entries() int { return 0 } +func (fakeBlock) Offset() int { return 0 } +func (f fakeBlock) MinTime() int64 { return f.mint } +func (f fakeBlock) MaxTime() int64 { return f.maxt } +func (fakeBlock) Iterator(context.Context, logql.LineFilter) iter.EntryIterator { + return nil +} + +func blockWithBounds(mint, maxt int64) chunkenc.Block { + return &fakeBlock{ + maxt: maxt, + mint: mint, + } +} diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 66e400439fd40..323b96083fa4b 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -14,7 +14,6 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" - "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" @@ -119,7 +118,7 @@ func decodeReq(req logql.SelectParams) ([]*labels.Matcher, logql.LineFilter, mod } // lazyChunks is an internal function used to resolve a set of lazy chunks from the store without actually loading them. It's used internally by `LazyQuery` and `GetSeries` -func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from, through model.Time) ([]*chunkenc.LazyChunk, error) { +func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from, through model.Time) ([]*LazyChunk, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err @@ -139,10 +138,10 @@ func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from totalChunks += len(chks[i]) } // creates lazychunks with chunks ref. - lazyChunks := make([]*chunkenc.LazyChunk, 0, totalChunks) + lazyChunks := make([]*LazyChunk, 0, totalChunks) for i := range chks { for _, c := range chks[i] { - lazyChunks = append(lazyChunks, &chunkenc.LazyChunk{Chunk: c, Fetcher: fetchers[i]}) + lazyChunks = append(lazyChunks, &LazyChunk{Chunk: c, Fetcher: fetchers[i]}) } } return lazyChunks, nil @@ -162,7 +161,7 @@ func (s *store) GetSeries(ctx context.Context, req logql.SelectParams) ([]logpro // group chunks by series chunksBySeries := partitionBySeriesChunks(lazyChunks) - firstChunksPerSeries := make([]*chunkenc.LazyChunk, 0, len(chunksBySeries)) + firstChunksPerSeries := make([]*LazyChunk, 0, len(chunksBySeries)) // discard all but one chunk per series for _, chks := range chunksBySeries { @@ -172,7 +171,7 @@ func (s *store) GetSeries(ctx context.Context, req logql.SelectParams) ([]logpro results := make(logproto.SeriesIdentifiers, 0, len(firstChunksPerSeries)) // bound concurrency - groups := make([][]*chunkenc.LazyChunk, 0, len(firstChunksPerSeries)/s.cfg.MaxChunkBatchSize+1) + groups := make([][]*LazyChunk, 0, len(firstChunksPerSeries)/s.cfg.MaxChunkBatchSize+1) split := s.cfg.MaxChunkBatchSize if len(firstChunksPerSeries) < split { @@ -226,6 +225,10 @@ func (s *store) LazyQuery(ctx context.Context, req logql.SelectParams) (iter.Ent return nil, err } + if len(lazyChunks) == 0 { + return iter.NoopIterator, nil + } + return newBatchChunkIterator(ctx, lazyChunks, s.cfg.MaxChunkBatchSize, matchers, filter, req.QueryRequest), nil } diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 76b2f574105b2..3a8059e295634 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -47,16 +47,16 @@ func assertStream(t *testing.T, expected, actual []logproto.Stream) { } } -func newLazyChunk(stream logproto.Stream) *chunkenc.LazyChunk { - return &chunkenc.LazyChunk{ +func newLazyChunk(stream logproto.Stream) *LazyChunk { + return &LazyChunk{ Fetcher: nil, IsValid: true, Chunk: newChunk(stream), } } -func newLazyInvalidChunk(stream logproto.Stream) *chunkenc.LazyChunk { - return &chunkenc.LazyChunk{ +func newLazyInvalidChunk(stream logproto.Stream) *LazyChunk { + return &LazyChunk{ Fetcher: nil, IsValid: false, Chunk: newChunk(stream),