From 991c3bd26bf872c1803a68b9fecb8f9762adf053 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 5 Nov 2024 19:29:52 +0000 Subject: [PATCH] Revert "Fix coroutine leak (#7821)" This reverts commit 9c925bfae108019dd9aabfb554e98653b1a892e1. --- CHANGELOG.md | 1 - pkg/store/bucket.go | 7 +- pkg/store/proxy_merge.go | 20 +- pkg/store/proxy_merge_test.go | 19 -- pkg/store/proxy_test.go | 181 ++++++------------ pkg/store/storepb/inprocess.go | 3 - .../storepb/testutil/store_series_client.go | 1 - 7 files changed, 68 insertions(+), 164 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 167fa435044..94906f4e206 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7644](https://github.com/thanos-io/thanos/pull/7644) fix(ui): add null check to find overlapping blocks logic - [#7814](https://github.com/thanos-io/thanos/pull/7814) Store: label_values: if matchers contain **name**=="something", do not add != "" to fetch less postings. - [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries -- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796. - [#7843](https://github.com/thanos-io/thanos/pull/7843) Query Frontend: fix slow query logging for non-query endpoints. - [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index b3a4a72d2e6..6d14b262ee7 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1574,6 +1574,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store tenant, ) + defer blockClient.Close() + g.Go(func() error { span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ @@ -3381,11 +3383,6 @@ func (r *bucketIndexReader) Close() error { return nil } -func (b *blockSeriesClient) CloseSend() error { - b.Close() - return nil -} - // LookupLabelsSymbols allows populates label set strings from symbolized label set. func (r *bucketIndexReader) LookupLabelsSymbols(ctx context.Context, symbolized []symbolizedLabel, b *labels.Builder) error { b.Reset(labels.EmptyLabels()) diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 4442cf8fdbc..29d1e6560a3 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -219,7 +219,6 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} { type lazyRespSet struct { // Generic parameters. span opentracing.Span - cl storepb.Store_SeriesClient closeSeries context.CancelFunc storeName string storeLabelSets []labels.Labels @@ -319,7 +318,6 @@ func newLazyRespSet( frameTimeout: frameTimeout, storeName: storeName, storeLabelSets: storeLabelSets, - cl: cl, closeSeries: closeSeries, span: span, dataOrFinishEvent: dataAvailable, @@ -448,10 +446,8 @@ func newAsyncRespSet( emptyStreamResponses prometheus.Counter, ) (respSet, error) { - var ( - span opentracing.Span - cancel context.CancelFunc - ) + var span opentracing.Span + var closeSeries context.CancelFunc storeID, storeAddr, isLocalStore := storeInfo(st) seriesCtx := grpc_opentracing.ClientAddContextTags(ctx, opentracing.Tags{ @@ -463,7 +459,7 @@ func newAsyncRespSet( "store.addr": storeAddr, }) - seriesCtx, cancel = context.WithCancel(seriesCtx) + seriesCtx, closeSeries = context.WithCancel(seriesCtx) shardMatcher := shardInfo.Matcher(buffers) @@ -478,7 +474,7 @@ func newAsyncRespSet( span.SetTag("err", err.Error()) span.Finish() - cancel() + closeSeries() return nil, err } @@ -501,7 +497,7 @@ func newAsyncRespSet( frameTimeout, st.String(), st.LabelSets(), - cancel, + closeSeries, cl, shardMatcher, applySharding, @@ -513,7 +509,7 @@ func newAsyncRespSet( frameTimeout, st.String(), st.LabelSets(), - cancel, + closeSeries, cl, shardMatcher, applySharding, @@ -534,7 +530,6 @@ func (l *lazyRespSet) Close() { l.dataOrFinishEvent.Signal() l.shardMatcher.Close() - _ = l.cl.CloseSend() } // eagerRespSet is a SeriesSet that blocks until all data is retrieved from @@ -544,7 +539,6 @@ type eagerRespSet struct { // Generic parameters. span opentracing.Span - cl storepb.Store_SeriesClient closeSeries context.CancelFunc frameTimeout time.Duration @@ -575,7 +569,6 @@ func newEagerRespSet( ) respSet { ret := &eagerRespSet{ span: span, - cl: cl, closeSeries: closeSeries, frameTimeout: frameTimeout, bufferedResponses: []*storepb.SeriesResponse{}, @@ -724,7 +717,6 @@ func (l *eagerRespSet) Close() { l.closeSeries() } l.shardMatcher.Close() - _ = l.cl.CloseSend() } func (l *eagerRespSet) At() *storepb.SeriesResponse { diff --git a/pkg/store/proxy_merge_test.go b/pkg/store/proxy_merge_test.go index 387d5ff6a87..4885cd6aa31 100644 --- a/pkg/store/proxy_merge_test.go +++ b/pkg/store/proxy_merge_test.go @@ -39,7 +39,6 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), @@ -48,7 +47,6 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")), @@ -68,7 +66,6 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), @@ -78,7 +75,6 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")), @@ -99,7 +95,6 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), @@ -109,7 +104,6 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), @@ -130,7 +124,6 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), @@ -140,7 +133,6 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")), @@ -161,7 +153,6 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), @@ -171,7 +162,6 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), @@ -192,7 +182,6 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), @@ -203,7 +192,6 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), @@ -235,12 +223,6 @@ func TestProxyResponseTreeSort(t *testing.T) { } } -type nopClientSendCloser struct { - storepb.Store_SeriesClient -} - -func (c nopClientSendCloser) CloseSend() error { return nil } - func TestSortWithoutLabels(t *testing.T) { t.Parallel() @@ -365,7 +347,6 @@ func BenchmarkKWayMerge(b *testing.B) { for j := 0; j < 1000; j++ { respSets = append(respSets, &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3")), diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 1389e89b1d2..49bcd2453e4 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -6,9 +6,6 @@ package store import ( "context" "fmt" - "strings" - - "github.com/pkg/errors" "math" "math/rand" @@ -22,6 +19,7 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" @@ -2220,126 +2218,68 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { t.Parallel() - logger := log.NewNopLogger() - - for _, respStrategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} { - t.Run(fmt.Sprintf("strategy=%v", respStrategy), func(t *testing.T) { - t.Run("failing send", func(t *testing.T) { - clients := []Client{ - &storetestutil.TestClient{ - StoreClient: &mockedStoreAPI{ - RespSeries: []*storepb.SeriesResponse{ - // Ensure more than 10 (internal respCh channel). - storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), - }, - }, - MinTime: math.MinInt64, - MaxTime: math.MaxInt64, - }, - &storetestutil.TestClient{ - StoreClient: &mockedStoreAPI{ - RespSeries: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), - }, - }, - MinTime: math.MinInt64, - MaxTime: math.MaxInt64, - }, - } - - p := &ProxyStore{ - logger: logger, - stores: func() []Client { return clients }, - metrics: newProxyStoreMetrics(nil), - responseTimeout: 50 * time.Millisecond, - retrievalStrategy: respStrategy, - tsdbSelector: DefaultSelector, - } - - ctx, cancel := context.WithCancel(context.Background()) - // We mimic failing series server, but practically context cancel will do the same. - testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ - ctx: ctx, - send: func(*storepb.SeriesResponse) error { - cancel() - return ctx.Err() - }, - })) - testutil.NotOk(t, ctx.Err()) - }) - - t.Run("client timeout", func(t *testing.T) { - clients := []Client{ - &storetestutil.TestClient{ - StoreClient: storepb.ServerAsClient(&storeServerStub{ - delay: 50 * time.Millisecond, - responses: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), - }, - }), - MinTime: math.MinInt64, - MaxTime: math.MaxInt64, - }, - } - - p := &ProxyStore{ - logger: logger, - stores: func() []Client { return clients }, - metrics: newProxyStoreMetrics(nil), - responseTimeout: 50 * time.Millisecond, - retrievalStrategy: respStrategy, - tsdbSelector: DefaultSelector, - } - - ctx := context.Background() - err := p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ - ctx: ctx, - send: func(*storepb.SeriesResponse) error { - return nil - }, - }) - - testutil.Assert(t, strings.Contains(err.Error(), context.Canceled.Error())) - }) - }) + clients := []Client{ + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + // Ensure more than 10 (internal respCh channel). + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + }, + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + }, } -} - -type storeServerStub struct { - storepb.StoreServer - delay time.Duration - responses []*storepb.SeriesResponse -} - -func (m *storeServerStub) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error { - for _, r := range m.responses { - <-time.After(m.delay) - if err := server.Send(r); err != nil { - return err - } + logger := log.NewNopLogger() + p := &ProxyStore{ + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + retrievalStrategy: EagerRetrieval, + tsdbSelector: DefaultSelector, } - return nil + + t.Run("failing send", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + // We mimic failing series server, but practically context cancel will do the same. + testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ + ctx: ctx, + send: func(*storepb.SeriesResponse) error { + cancel() + return ctx.Err() + }, + })) + testutil.NotOk(t, ctx.Err()) + }) } func TestProxyStore_storeMatchMetadata(t *testing.T) { @@ -2461,7 +2401,6 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { h := NewResponseDeduplicator(NewProxyResponseLoserTree( &eagerRespSet{ closeSeries: func() {}, - cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: tcase.responses, }, diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index 0c3e7641baa..a5b792bca19 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -55,9 +55,6 @@ func (c *inProcessClient) Recv() (*SeriesResponse, error) { return nil, err } if !ok { - if c.ctx.Err() != nil { - return nil, c.ctx.Err() - } return nil, io.EOF } return resp, err diff --git a/pkg/store/storepb/testutil/store_series_client.go b/pkg/store/storepb/testutil/store_series_client.go index e370f7c984b..2580692ca2a 100644 --- a/pkg/store/storepb/testutil/store_series_client.go +++ b/pkg/store/storepb/testutil/store_series_client.go @@ -50,4 +50,3 @@ func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (c *StoreSeriesClient) Context() context.Context { return c.Ctx } -func (c *StoreSeriesClient) CloseSend() error { return nil }