From 5ed48cb373d9ba27eb801899ee04326836cc20f2 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Mon, 14 Oct 2024 09:26:37 +0200 Subject: [PATCH] Fix coroutine leak The in-process client uses a pull based iterator which needs to be closed, otherwise it will leak the underlying coroutine. When this happens, the tsdb reader will remain open which blocks head compaction indefinitely. Signed-off-by: Filip Petkovski --- pkg/store/proxy_merge.go | 17 ++++++-- pkg/store/proxy_test.go | 40 ++++++++++++++++++- pkg/store/storepb/inprocess.go | 3 ++ .../storepb/testutil/store_series_client.go | 1 + 4 files changed, 56 insertions(+), 5 deletions(-) diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 29d1e6560a..2f4f932605 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -446,8 +446,10 @@ func newAsyncRespSet( emptyStreamResponses prometheus.Counter, ) (respSet, error) { - var span opentracing.Span - var closeSeries context.CancelFunc + var ( + span opentracing.Span + cancel context.CancelFunc + ) storeID, storeAddr, isLocalStore := storeInfo(st) seriesCtx := grpc_opentracing.ClientAddContextTags(ctx, opentracing.Tags{ @@ -459,7 +461,7 @@ func newAsyncRespSet( "store.addr": storeAddr, }) - seriesCtx, closeSeries = context.WithCancel(seriesCtx) + seriesCtx, cancel = context.WithCancel(seriesCtx) shardMatcher := shardInfo.Matcher(buffers) @@ -474,7 +476,7 @@ func newAsyncRespSet( span.SetTag("err", err.Error()) span.Finish() - closeSeries() + cancel() return nil, err } @@ -490,6 +492,13 @@ func newAsyncRespSet( } } + closeSeries := func() { + cancel() + err := cl.CloseSend() + if err != nil { + level.Warn(logger).Log("msg", "detected close error", "err", err.Error()) + } + } switch retrievalStrategy { case LazyRetrieval: return newLazyRespSet( diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index e9e3889465..fcd7ddc5f6 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -2251,6 +2251,18 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { MinTime: math.MinInt64, MaxTime: math.MaxInt64, }, + &storetestutil.TestClient{ + StoreClient: storepb.ServerAsClient(&storeServerStub{ + delay: 50 * time.Millisecond, + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }), + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + }, } logger := log.NewNopLogger() @@ -2258,7 +2270,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { logger: logger, stores: func() []Client { return clients }, metrics: newProxyStoreMetrics(nil), - responseTimeout: 0, + responseTimeout: 50 * time.Millisecond, retrievalStrategy: EagerRetrieval, tsdbSelector: DefaultSelector, } @@ -2275,6 +2287,32 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { })) testutil.NotOk(t, ctx.Err()) }) + t.Run("client timeout", func(t *testing.T) { + ctx := context.Background() + testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ + ctx: ctx, + send: func(*storepb.SeriesResponse) error { + return nil + }, + })) + }) +} + +type storeServerStub struct { + storepb.StoreServer + + delay time.Duration + responses []*storepb.SeriesResponse +} + +func (m *storeServerStub) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error { + for _, r := range m.responses { + <-time.After(m.delay) + if err := server.Send(r); err != nil { + return err + } + } + return nil } func TestProxyStore_storeMatchMetadata(t *testing.T) { diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index a5b792bca1..0c3e7641ba 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -55,6 +55,9 @@ func (c *inProcessClient) Recv() (*SeriesResponse, error) { return nil, err } if !ok { + if c.ctx.Err() != nil { + return nil, c.ctx.Err() + } return nil, io.EOF } return resp, err diff --git a/pkg/store/storepb/testutil/store_series_client.go b/pkg/store/storepb/testutil/store_series_client.go index 2580692ca2..e370f7c984 100644 --- a/pkg/store/storepb/testutil/store_series_client.go +++ b/pkg/store/storepb/testutil/store_series_client.go @@ -50,3 +50,4 @@ func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (c *StoreSeriesClient) Context() context.Context { return c.Ctx } +func (c *StoreSeriesClient) CloseSend() error { return nil }