Skip to content

Commit

Permalink
store/bucket: wait until chunk loading ends in Close()
Browse files Browse the repository at this point in the history
Chunk reader needs to wait until the chunk loading ends in Close()
because otherwise there will be a race between appending to r.chunkBytes
and reading from it.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Aug 9, 2023
1 parent 93cb319 commit 6eb3eb7
Showing 1 changed file with 29 additions and 0 deletions.
29 changes: 29 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3160,6 +3160,10 @@ type bucketChunkReader struct {
mtx sync.Mutex
stats *queryStats
chunkBytes []*[]byte // Byte slice to return to the chunk pool on close.

loadingChunksMtx sync.Mutex
loadingChunks bool
finishLoadingChks chan struct{}
}

func newBucketChunkReader(block *bucketBlock) *bucketChunkReader {
Expand All @@ -3174,9 +3178,22 @@ func (r *bucketChunkReader) reset() {
for i := range r.toLoad {
r.toLoad[i] = r.toLoad[i][:0]
}
r.loadingChunksMtx.Lock()
r.loadingChunks = false
r.finishLoadingChks = make(chan struct{})
r.loadingChunksMtx.Unlock()
}

func (r *bucketChunkReader) Close() error {
// NOTE(GiedriusS): we need to wait until loading chunks because loading
// chunks modifies r.block.chunkPool.
r.loadingChunksMtx.Lock()
loadingChks := r.loadingChunks
r.loadingChunksMtx.Unlock()

if loadingChks {
<-r.finishLoadingChks
}
r.block.pendingReaders.Done()

for _, b := range r.chunkBytes {
Expand All @@ -3201,6 +3218,18 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int)

// load loads all added chunks and saves resulting aggrs to refs.
func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool, bytesLimiter BytesLimiter) error {
r.loadingChunksMtx.Lock()
r.loadingChunks = true
r.loadingChunksMtx.Unlock()

defer func() {
r.loadingChunksMtx.Lock()
r.loadingChunks = false
r.loadingChunksMtx.Unlock()

close(r.finishLoadingChks)
}()

g, ctx := errgroup.WithContext(ctx)

for seq, pIdxs := range r.toLoad {
Expand Down

0 comments on commit 6eb3eb7

Please sign in to comment.