diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ef910c64f..1443c080ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7323](https://github.com/thanos-io/thanos/pull/7323) Sidecar: wait for prometheus on startup - [#7326](https://github.com/thanos-io/thanos/pull/7326) Query: fixing exemplars proxy when querying stores with multiple tenants. - [#7335](https://github.com/thanos-io/thanos/pull/7335) Dependency: Update minio-go to v7.0.70 which includes support for EKS Pod Identity. +- [#6948](https://github.com/thanos-io/thanos/pull/6948) Receive: fix goroutines leak during series requests to thanos store api. ### Added diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 43ac0d6c1a..d5e90a209a 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1571,7 +1571,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store var resp respSet if s.sortingStrategy == sortingStrategyStore { resp = newEagerRespSet( - srv.Context(), span, 10*time.Minute, blk.meta.ULID.String(), @@ -1585,7 +1584,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store ) } else { resp = newLazyRespSet( - srv.Context(), span, 10*time.Minute, blk.meta.ULID.String(), diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 0235958b4f..62469b7959 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -211,13 +211,11 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} { type lazyRespSet struct { // Generic parameters. span opentracing.Span - cl storepb.Store_SeriesClient closeSeries context.CancelFunc storeName string storeLabelSets []labels.Labels storeLabels map[string]struct{} frameTimeout time.Duration - ctx context.Context // Internal bookkeeping. dataOrFinishEvent *sync.Cond @@ -294,7 +292,6 @@ func (l *lazyRespSet) At() *storepb.SeriesResponse { } func newLazyRespSet( - ctx context.Context, span opentracing.Span, frameTimeout time.Duration, storeName string, @@ -311,12 +308,10 @@ func newLazyRespSet( respSet := &lazyRespSet{ frameTimeout: frameTimeout, - cl: cl, storeName: storeName, storeLabelSets: storeLabelSets, closeSeries: closeSeries, span: span, - ctx: ctx, dataOrFinishEvent: dataAvailable, bufferedResponsesMtx: bufferedResponsesMtx, bufferedResponses: bufferedResponses, @@ -353,19 +348,9 @@ func newLazyRespSet( defer t.Reset(frameTimeout) } - select { - case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st) - l.span.SetTag("err", err.Error()) + resp, err := cl.Recv() - l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) - l.noMoreData = true - l.dataOrFinishEvent.Signal() - l.bufferedResponsesMtx.Unlock() - return false - default: - resp, err := cl.Recv() + if err != nil { if err == io.EOF { l.bufferedResponsesMtx.Lock() l.noMoreData = true @@ -374,45 +359,43 @@ func newLazyRespSet( return false } - if err != nil { - // TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled. - var rerr error - if t != nil && !t.Stop() && errors.Is(err, context.Canceled) { - // Most likely the per-Recv timeout has been reached. - // There's a small race between canceling and the Recv() - // but this is most likely true. + var rerr error + // If timer is already stopped + if t != nil && !t.Stop() { + if errors.Is(err, context.Canceled) { + // The per-Recv timeout has been reached. rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st) - } else { - rerr = errors.Wrapf(err, "receive series from %s", st) } - - l.span.SetTag("err", rerr.Error()) - - l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) - l.noMoreData = true - l.dataOrFinishEvent.Signal() - l.bufferedResponsesMtx.Unlock() - return false - } - - numResponses++ - bytesProcessed += resp.Size() - - if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { - return true + } else { + rerr = errors.Wrapf(err, "receive series from %s", st) } - if resp.GetSeries() != nil { - seriesStats.Count(resp.GetSeries()) - } + l.span.SetTag("err", rerr.Error()) l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, resp) + l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) + l.noMoreData = true l.dataOrFinishEvent.Signal() l.bufferedResponsesMtx.Unlock() + return false + } + + numResponses++ + bytesProcessed += resp.Size() + + if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { return true } + + if resp.GetSeries() != nil { + seriesStats.Count(resp.GetSeries()) + } + + l.bufferedResponsesMtx.Lock() + l.bufferedResponses = append(l.bufferedResponses, resp) + l.dataOrFinishEvent.Signal() + l.bufferedResponsesMtx.Unlock() + return true } var t *time.Timer @@ -509,7 +492,6 @@ func newAsyncRespSet( switch retrievalStrategy { case LazyRetrieval: return newLazyRespSet( - seriesCtx, span, frameTimeout, st.String(), @@ -522,7 +504,6 @@ func newAsyncRespSet( ), nil case EagerRetrieval: return newEagerRespSet( - seriesCtx, span, frameTimeout, st.String(), @@ -556,8 +537,6 @@ func (l *lazyRespSet) Close() { type eagerRespSet struct { // Generic parameters. span opentracing.Span - cl storepb.Store_SeriesClient - ctx context.Context closeSeries context.CancelFunc frameTimeout time.Duration @@ -576,7 +555,6 @@ type eagerRespSet struct { } func newEagerRespSet( - ctx context.Context, span opentracing.Span, frameTimeout time.Duration, storeName string, @@ -591,9 +569,7 @@ func newEagerRespSet( ret := &eagerRespSet{ span: span, closeSeries: closeSeries, - cl: cl, frameTimeout: frameTimeout, - ctx: ctx, bufferedResponses: []*storepb.SeriesResponse{}, wg: &sync.WaitGroup{}, shardMatcher: shardMatcher, @@ -638,48 +614,45 @@ func newEagerRespSet( defer t.Reset(frameTimeout) } - select { - case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", storeName) - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) - l.span.SetTag("err", err.Error()) - return false - default: - resp, err := cl.Recv() + resp, err := cl.Recv() + + if err != nil { if err == io.EOF { return false } - if err != nil { - // TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled. - var rerr error - if t != nil && !t.Stop() && errors.Is(err, context.Canceled) { - // Most likely the per-Recv timeout has been reached. - // There's a small race between canceling and the Recv() - // but this is most likely true. + + var rerr error + // If timer is already stopped + if t != nil && !t.Stop() { + <-t.C // Drain the channel if it was already stopped. + if errors.Is(err, context.Canceled) { + // The per-Recv timeout has been reached. rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName) - } else { - rerr = errors.Wrapf(err, "receive series from %s", storeName) } - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) - l.span.SetTag("err", rerr.Error()) - return false + } else { + rerr = errors.Wrapf(err, "receive series from %s", storeName) } - numResponses++ - bytesProcessed += resp.Size() - - if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { - return true - } + l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) + l.span.SetTag("err", rerr.Error()) + return false + } - if resp.GetSeries() != nil { - seriesStats.Count(resp.GetSeries()) - } + numResponses++ + bytesProcessed += resp.Size() - l.bufferedResponses = append(l.bufferedResponses, resp) + if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { return true } + + if resp.GetSeries() != nil { + seriesStats.Count(resp.GetSeries()) + } + + l.bufferedResponses = append(l.bufferedResponses, resp) + return true } + var t *time.Timer if frameTimeout > 0 { t = time.AfterFunc(frameTimeout, closeSeries) diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index 24b90f3885..e09210d442 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -36,7 +36,9 @@ func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc inSrv := &inProcessStream{recv: make(chan *SeriesResponse), err: make(chan error)} inSrv.ctx, inSrv.cancel = context.WithCancel(ctx) go func() { - inSrv.err <- s.srv.Series(in, inSrv) + if err := s.srv.Series(in, inSrv); err != nil { + inSrv.err <- err + } close(inSrv.err) close(inSrv.recv) }() @@ -88,15 +90,13 @@ func (s *inProcessClientStream) CloseSend() error { func (s *inProcessClientStream) Recv() (*SeriesResponse, error) { select { - case <-s.srv.ctx.Done(): - return nil, s.srv.ctx.Err() case r, ok := <-s.srv.recv: if !ok { return nil, io.EOF } return r, nil - case err := <-s.srv.err: - if err == nil { + case err, ok := <-s.srv.err: + if !ok { return nil, io.EOF } return nil, err