From 6eb3eb79bac5a36ae5d04eae04f903ea31778be2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Fri, 4 Aug 2023 13:38:27 +0300 Subject: [PATCH] store/bucket: wait until chunk loading ends in Close() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Chunk reader needs to wait until the chunk loading ends in Close() because otherwise there will be a race between appending to r.chunkBytes and reading from it. Signed-off-by: Giedrius Statkevičius --- pkg/store/bucket.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 09bfba3206..d5c46b19c3 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -3160,6 +3160,10 @@ type bucketChunkReader struct { mtx sync.Mutex stats *queryStats chunkBytes []*[]byte // Byte slice to return to the chunk pool on close. + + loadingChunksMtx sync.Mutex + loadingChunks bool + finishLoadingChks chan struct{} } func newBucketChunkReader(block *bucketBlock) *bucketChunkReader { @@ -3174,9 +3178,22 @@ func (r *bucketChunkReader) reset() { for i := range r.toLoad { r.toLoad[i] = r.toLoad[i][:0] } + r.loadingChunksMtx.Lock() + r.loadingChunks = false + r.finishLoadingChks = make(chan struct{}) + r.loadingChunksMtx.Unlock() } func (r *bucketChunkReader) Close() error { + // NOTE(GiedriusS): we need to wait until loading chunks because loading + // chunks modifies r.block.chunkPool. + r.loadingChunksMtx.Lock() + loadingChks := r.loadingChunks + r.loadingChunksMtx.Unlock() + + if loadingChks { + <-r.finishLoadingChks + } r.block.pendingReaders.Done() for _, b := range r.chunkBytes { @@ -3201,6 +3218,18 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) // load loads all added chunks and saves resulting aggrs to refs. func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error { + r.loadingChunksMtx.Lock() + r.loadingChunks = true + r.loadingChunksMtx.Unlock() + + defer func() { + r.loadingChunksMtx.Lock() + r.loadingChunks = false + r.loadingChunksMtx.Unlock() + + close(r.finishLoadingChks) + }() + g, ctx := errgroup.WithContext(ctx) for seq, pIdxs := range r.toLoad {