diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index a072136fc217..9c8462ecf135 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -43,6 +43,8 @@ type batchChunkIterator struct { lastOverlapping []*LazyChunk iterFactory chunksIteratorFactory + begun bool + ctx context.Context cancel context.CancelFunc start, end time.Time direction logproto.Direction @@ -69,6 +71,7 @@ func newBatchChunkIterator( start: start, end: end, direction: direction, + ctx: ctx, cancel: cancel, iterFactory: iterFactory, chunks: lazyChunks{direction: direction, chunks: chunks}, @@ -78,10 +81,17 @@ func newBatchChunkIterator( }), } sort.Sort(res.chunks) - go res.loop(ctx) return res } +// Start is idempotent and will begin the processing thread which seeds the iterator data. +func (it *batchChunkIterator) Start() { + if !it.begun { + it.begun = true + go it.loop(it.ctx) + } +} + func (it *batchChunkIterator) loop(ctx context.Context) { for { if it.chunks.Len() == 0 { @@ -111,6 +121,8 @@ func (it *batchChunkIterator) loop(ctx context.Context) { } func (it *batchChunkIterator) Next() bool { + it.Start() // Ensure the iterator has started. + var err error // for loop to avoid recursion for { @@ -300,7 +312,10 @@ func newLogBatchIterator( } batch := newBatchChunkIterator(ctx, chunks, batchSize, direction, start, end, logbatch.newChunksIterator) + // Important: since the batchChunkIterator is bound to the LogBatchIterator, + // ensure embedded fields are present before it's started. logbatch.batchChunkIterator = batch + batch.Start() return logbatch, nil } @@ -398,7 +413,11 @@ func newSampleBatchIterator( ctx: ctx, } batch := newBatchChunkIterator(ctx, chunks, batchSize, logproto.FORWARD, start, end, samplebatch.newChunksIterator) + + // Important: since the batchChunkIterator is bound to the SampleBatchIterator, + // ensure embedded fields are present before it's started. samplebatch.batchChunkIterator = batch + batch.Start() return samplebatch, nil }