diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 7bedebefbef..2a289fa29d9 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1157,7 +1157,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie bytesLimiter = s.bytesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("bytes")) ctx = srv.Context() stats = &queryStats{} - res []respSet + respSets []respSet mtx sync.Mutex g, gctx = errgroup.WithContext(ctx) resHints = &hintspb.SeriesResponseHints{} @@ -1245,7 +1245,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie ) mtx.Lock() - res = append(res, part) + respSets = append(respSets, part) mtx.Unlock() return nil @@ -1297,7 +1297,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie } return status.Error(code, err.Error()) } - stats.blocksQueried = len(res) + stats.blocksQueried = len(respSets) stats.GetAllDuration = time.Since(begin) s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds()) s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried)) @@ -1305,15 +1305,20 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie // Merge the sub-results from each selected block. tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) { + defer func() { + for _, resp := range respSets { + resp.Close() + } + }() begin := time.Now() - set := NewDedupResponseHeap(NewProxyResponseHeap(res...)) + set := NewDedupResponseHeap(NewProxyResponseHeap(respSets...)) for set.Next() { at := set.At() warn := at.GetWarning() if warn != "" { - // TODO(fpetkovski): Consider deprecating string based warnings - // TODO(fpetkovski): in favor of a separate protobuf message containing - // TODO(fpetkovski): the grpc code and a human readable error message. + // TODO(fpetkovski): Consider deprecating string based warnings in favor of a + // separate protobuf message containing the grpc code and + // a human readable error message. err = status.Error(storepb.GRPCCodeFromWarn(warn), at.GetWarning()) return }