Skip to content

Commit

Permalink
Receive: fix serverAsClient.Series goroutines leak (#6948)
Browse files Browse the repository at this point in the history
* fix serverAsClient goroutines leak

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

* fix lint

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

* update changelog

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

* delete invalid comment

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

* remove temp dev test

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

* remove timer channel drain

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

---------

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>
  • Loading branch information
thibaultmg authored May 20, 2024
1 parent 9e6cbd9 commit 258154a
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7323](https://github.com/thanos-io/thanos/pull/7323) Sidecar: wait for prometheus on startup
- [#7326](https://github.com/thanos-io/thanos/pull/7326) Query: fixing exemplars proxy when querying stores with multiple tenants.
- [#7335](https://github.com/thanos-io/thanos/pull/7335) Dependency: Update minio-go to v7.0.70 which includes support for EKS Pod Identity.
- [#6948](https://github.com/thanos-io/thanos/pull/6948) Receive: fix goroutines leak during series requests to thanos store api.

### Added

Expand Down
2 changes: 0 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,7 +1589,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
var resp respSet
if s.sortingStrategy == sortingStrategyStore {
resp = newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
Expand All @@ -1603,7 +1602,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
)
} else {
resp = newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
Expand Down
139 changes: 56 additions & 83 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,11 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} {
type lazyRespSet struct {
// Generic parameters.
span opentracing.Span
cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
storeName string
storeLabelSets []labels.Labels
storeLabels map[string]struct{}
frameTimeout time.Duration
ctx context.Context

// Internal bookkeeping.
dataOrFinishEvent *sync.Cond
Expand Down Expand Up @@ -294,7 +292,6 @@ func (l *lazyRespSet) At() *storepb.SeriesResponse {
}

func newLazyRespSet(
ctx context.Context,
span opentracing.Span,
frameTimeout time.Duration,
storeName string,
Expand All @@ -311,12 +308,10 @@ func newLazyRespSet(

respSet := &lazyRespSet{
frameTimeout: frameTimeout,
cl: cl,
storeName: storeName,
storeLabelSets: storeLabelSets,
closeSeries: closeSeries,
span: span,
ctx: ctx,
dataOrFinishEvent: dataAvailable,
bufferedResponsesMtx: bufferedResponsesMtx,
bufferedResponses: bufferedResponses,
Expand Down Expand Up @@ -353,19 +348,9 @@ func newLazyRespSet(
defer t.Reset(frameTimeout)
}

select {
case <-l.ctx.Done():
err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st)
l.span.SetTag("err", err.Error())
resp, err := cl.Recv()

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err))
l.noMoreData = true
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return false
default:
resp, err := cl.Recv()
if err != nil {
if err == io.EOF {
l.bufferedResponsesMtx.Lock()
l.noMoreData = true
Expand All @@ -374,45 +359,43 @@ func newLazyRespSet(
return false
}

if err != nil {
// TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled.
var rerr error
if t != nil && !t.Stop() && errors.Is(err, context.Canceled) {
// Most likely the per-Recv timeout has been reached.
// There's a small race between canceling and the Recv()
// but this is most likely true.
var rerr error
// If timer is already stopped
if t != nil && !t.Stop() {
if errors.Is(err, context.Canceled) {
// The per-Recv timeout has been reached.
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st)
} else {
rerr = errors.Wrapf(err, "receive series from %s", st)
}

l.span.SetTag("err", rerr.Error())

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.noMoreData = true
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return false
}

numResponses++
bytesProcessed += resp.Size()

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
} else {
rerr = errors.Wrapf(err, "receive series from %s", st)
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}
l.span.SetTag("err", rerr.Error())

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, resp)
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.noMoreData = true
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return false
}

numResponses++
bytesProcessed += resp.Size()

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, resp)
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return true
}

var t *time.Timer
Expand Down Expand Up @@ -509,7 +492,6 @@ func newAsyncRespSet(
switch retrievalStrategy {
case LazyRetrieval:
return newLazyRespSet(
seriesCtx,
span,
frameTimeout,
st.String(),
Expand All @@ -522,7 +504,6 @@ func newAsyncRespSet(
), nil
case EagerRetrieval:
return newEagerRespSet(
seriesCtx,
span,
frameTimeout,
st.String(),
Expand Down Expand Up @@ -556,8 +537,6 @@ func (l *lazyRespSet) Close() {
type eagerRespSet struct {
// Generic parameters.
span opentracing.Span
cl storepb.Store_SeriesClient
ctx context.Context

closeSeries context.CancelFunc
frameTimeout time.Duration
Expand All @@ -576,7 +555,6 @@ type eagerRespSet struct {
}

func newEagerRespSet(
ctx context.Context,
span opentracing.Span,
frameTimeout time.Duration,
storeName string,
Expand All @@ -591,9 +569,7 @@ func newEagerRespSet(
ret := &eagerRespSet{
span: span,
closeSeries: closeSeries,
cl: cl,
frameTimeout: frameTimeout,
ctx: ctx,
bufferedResponses: []*storepb.SeriesResponse{},
wg: &sync.WaitGroup{},
shardMatcher: shardMatcher,
Expand Down Expand Up @@ -638,48 +614,45 @@ func newEagerRespSet(
defer t.Reset(frameTimeout)
}

select {
case <-l.ctx.Done():
err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", storeName)
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err))
l.span.SetTag("err", err.Error())
return false
default:
resp, err := cl.Recv()
resp, err := cl.Recv()

if err != nil {
if err == io.EOF {
return false
}
if err != nil {
// TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled.
var rerr error
if t != nil && !t.Stop() && errors.Is(err, context.Canceled) {
// Most likely the per-Recv timeout has been reached.
// There's a small race between canceling and the Recv()
// but this is most likely true.

var rerr error
// If timer is already stopped
if t != nil && !t.Stop() {
<-t.C // Drain the channel if it was already stopped.
if errors.Is(err, context.Canceled) {
// The per-Recv timeout has been reached.
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName)
} else {
rerr = errors.Wrapf(err, "receive series from %s", storeName)
}
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.span.SetTag("err", rerr.Error())
return false
} else {
rerr = errors.Wrapf(err, "receive series from %s", storeName)
}

numResponses++
bytesProcessed += resp.Size()

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.span.SetTag("err", rerr.Error())
return false
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}
numResponses++
bytesProcessed += resp.Size()

l.bufferedResponses = append(l.bufferedResponses, resp)
if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}

l.bufferedResponses = append(l.bufferedResponses, resp)
return true
}

var t *time.Timer
if frameTimeout > 0 {
t = time.AfterFunc(frameTimeout, closeSeries)
Expand Down
10 changes: 5 additions & 5 deletions pkg/store/storepb/inprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc
inSrv := &inProcessStream{recv: make(chan *SeriesResponse), err: make(chan error)}
inSrv.ctx, inSrv.cancel = context.WithCancel(ctx)
go func() {
inSrv.err <- s.srv.Series(in, inSrv)
if err := s.srv.Series(in, inSrv); err != nil {
inSrv.err <- err
}
close(inSrv.err)
close(inSrv.recv)
}()
Expand Down Expand Up @@ -88,15 +90,13 @@ func (s *inProcessClientStream) CloseSend() error {

func (s *inProcessClientStream) Recv() (*SeriesResponse, error) {
select {
case <-s.srv.ctx.Done():
return nil, s.srv.ctx.Err()
case r, ok := <-s.srv.recv:
if !ok {
return nil, io.EOF
}
return r, nil
case err := <-s.srv.err:
if err == nil {
case err, ok := <-s.srv.err:
if !ok {
return nil, io.EOF
}
return nil, err
Expand Down

0 comments on commit 258154a

Please sign in to comment.