Skip to content

Commit

Permalink
Fix coroutine leak
Browse files Browse the repository at this point in the history
The in-process client uses a pull based iterator which needs
to be closed, otherwise it will leak the underlying coroutine.
When this happens, the tsdb reader will remain open which blocks head
compaction indefinitely.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Oct 14, 2024
1 parent 2f39d24 commit ce047e7
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 7 deletions.
7 changes: 5 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1107,6 +1107,11 @@ func (b *blockSeriesClient) Close() {
runutil.CloseWithLogOnErr(b.logger, b.indexr, "series block")
}

func (b *blockSeriesClient) CloseSend() error {
b.Close()
return nil
}

func (b *blockSeriesClient) MergeStats(stats *queryStats) *queryStats {
stats.merge(b.indexr.stats)
if !b.skipChunks {
Expand Down Expand Up @@ -1574,8 +1579,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
tenant,
)

defer blockClient.Close()

g.Go(func() error {

span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{
Expand Down
17 changes: 13 additions & 4 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,10 @@ func newAsyncRespSet(
emptyStreamResponses prometheus.Counter,
) (respSet, error) {

var span opentracing.Span
var closeSeries context.CancelFunc
var (
span opentracing.Span
cancel context.CancelFunc
)

storeID, storeAddr, isLocalStore := storeInfo(st)
seriesCtx := grpc_opentracing.ClientAddContextTags(ctx, opentracing.Tags{
Expand All @@ -459,7 +461,7 @@ func newAsyncRespSet(
"store.addr": storeAddr,
})

seriesCtx, closeSeries = context.WithCancel(seriesCtx)
seriesCtx, cancel = context.WithCancel(seriesCtx)

shardMatcher := shardInfo.Matcher(buffers)

Expand All @@ -474,7 +476,7 @@ func newAsyncRespSet(

span.SetTag("err", err.Error())
span.Finish()
closeSeries()
cancel()
return nil, err
}

Expand All @@ -490,6 +492,13 @@ func newAsyncRespSet(
}
}

closeSeries := func() {
cancel()
err := cl.CloseSend()
if err != nil {
level.Warn(logger).Log("msg", "detected close error", "err", err.Error())
}
}
switch retrievalStrategy {
case LazyRetrieval:
return newLazyRespSet(
Expand Down
40 changes: 39 additions & 1 deletion pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2251,14 +2251,26 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
&storetestutil.TestClient{
StoreClient: storepb.ServerAsClient(&storeServerStub{
delay: 50 * time.Millisecond,
responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
}),
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
}

logger := log.NewNopLogger()
p := &ProxyStore{
logger: logger,
stores: func() []Client { return clients },
metrics: newProxyStoreMetrics(nil),
responseTimeout: 0,
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: EagerRetrieval,
tsdbSelector: DefaultSelector,
}
Expand All @@ -2275,6 +2287,32 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
}))
testutil.NotOk(t, ctx.Err())
})
t.Run("client timeout", func(t *testing.T) {
ctx := context.Background()
testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
ctx: ctx,
send: func(*storepb.SeriesResponse) error {
return nil
},
}))
})
}

type storeServerStub struct {
storepb.StoreServer

delay time.Duration
responses []*storepb.SeriesResponse
}

func (m *storeServerStub) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error {
for _, r := range m.responses {
<-time.After(m.delay)
if err := server.Send(r); err != nil {
return err
}
}
return nil
}

func TestProxyStore_storeMatchMetadata(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/store/storepb/inprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (c *inProcessClient) Recv() (*SeriesResponse, error) {
return nil, err
}
if !ok {
if c.ctx.Err() != nil {
return nil, c.ctx.Err()
}
return nil, io.EOF
}
return resp, err
Expand Down
1 change: 1 addition & 0 deletions pkg/store/storepb/testutil/store_series_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) {
func (c *StoreSeriesClient) Context() context.Context {
return c.Ctx
}
func (c *StoreSeriesClient) CloseSend() error { return nil }

0 comments on commit ce047e7

Please sign in to comment.