diff --git a/CHANGELOG.md b/CHANGELOG.md index ec80ea8aab..e24d1b488f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7643](https://github.com/thanos-io/thanos/pull/7643) Receive: fix thanos_receive_write_{timeseries,samples} stats - [#7644](https://github.com/thanos-io/thanos/pull/7644) fix(ui): add null check to find overlapping blocks logic - [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries +- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796. ### Added - [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 865ea3878d..55ec52e71f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1574,8 +1574,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store tenant, ) - defer blockClient.Close() - g.Go(func() error { span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{ @@ -3356,6 +3354,11 @@ func (r *bucketIndexReader) Close() error { return nil } +func (b *blockSeriesClient) CloseSend() error { + b.Close() + return nil +} + // LookupLabelsSymbols allows populates label set strings from symbolized label set. func (r *bucketIndexReader) LookupLabelsSymbols(ctx context.Context, symbolized []symbolizedLabel, b *labels.Builder) error { b.Reset(labels.EmptyLabels()) diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 29d1e6560a..4442cf8fdb 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -219,6 +219,7 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} { type lazyRespSet struct { // Generic parameters. span opentracing.Span + cl storepb.Store_SeriesClient closeSeries context.CancelFunc storeName string storeLabelSets []labels.Labels @@ -318,6 +319,7 @@ func newLazyRespSet( frameTimeout: frameTimeout, storeName: storeName, storeLabelSets: storeLabelSets, + cl: cl, closeSeries: closeSeries, span: span, dataOrFinishEvent: dataAvailable, @@ -446,8 +448,10 @@ func newAsyncRespSet( emptyStreamResponses prometheus.Counter, ) (respSet, error) { - var span opentracing.Span - var closeSeries context.CancelFunc + var ( + span opentracing.Span + cancel context.CancelFunc + ) storeID, storeAddr, isLocalStore := storeInfo(st) seriesCtx := grpc_opentracing.ClientAddContextTags(ctx, opentracing.Tags{ @@ -459,7 +463,7 @@ func newAsyncRespSet( "store.addr": storeAddr, }) - seriesCtx, closeSeries = context.WithCancel(seriesCtx) + seriesCtx, cancel = context.WithCancel(seriesCtx) shardMatcher := shardInfo.Matcher(buffers) @@ -474,7 +478,7 @@ func newAsyncRespSet( span.SetTag("err", err.Error()) span.Finish() - closeSeries() + cancel() return nil, err } @@ -497,7 +501,7 @@ func newAsyncRespSet( frameTimeout, st.String(), st.LabelSets(), - closeSeries, + cancel, cl, shardMatcher, applySharding, @@ -509,7 +513,7 @@ func newAsyncRespSet( frameTimeout, st.String(), st.LabelSets(), - closeSeries, + cancel, cl, shardMatcher, applySharding, @@ -530,6 +534,7 @@ func (l *lazyRespSet) Close() { l.dataOrFinishEvent.Signal() l.shardMatcher.Close() + _ = l.cl.CloseSend() } // eagerRespSet is a SeriesSet that blocks until all data is retrieved from @@ -539,6 +544,7 @@ type eagerRespSet struct { // Generic parameters. span opentracing.Span + cl storepb.Store_SeriesClient closeSeries context.CancelFunc frameTimeout time.Duration @@ -569,6 +575,7 @@ func newEagerRespSet( ) respSet { ret := &eagerRespSet{ span: span, + cl: cl, closeSeries: closeSeries, frameTimeout: frameTimeout, bufferedResponses: []*storepb.SeriesResponse{}, @@ -717,6 +724,7 @@ func (l *eagerRespSet) Close() { l.closeSeries() } l.shardMatcher.Close() + _ = l.cl.CloseSend() } func (l *eagerRespSet) At() *storepb.SeriesResponse { diff --git a/pkg/store/proxy_merge_test.go b/pkg/store/proxy_merge_test.go index f72ba4a4ef..96c17a87a8 100644 --- a/pkg/store/proxy_merge_test.go +++ b/pkg/store/proxy_merge_test.go @@ -35,6 +35,7 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), @@ -43,6 +44,7 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")), @@ -62,6 +64,7 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), @@ -71,6 +74,7 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")), @@ -91,6 +95,7 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), @@ -100,6 +105,7 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), @@ -120,6 +126,7 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")), @@ -129,6 +136,7 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")), @@ -149,6 +157,7 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")), @@ -158,6 +167,7 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")), @@ -178,6 +188,7 @@ func TestProxyResponseTreeSort(t *testing.T) { input: []respSet{ &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), @@ -188,6 +199,7 @@ func TestProxyResponseTreeSort(t *testing.T) { }, &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")), @@ -219,6 +231,12 @@ func TestProxyResponseTreeSort(t *testing.T) { } } +type nopClientSendCloser struct { + storepb.Store_SeriesClient +} + +func (c nopClientSendCloser) CloseSend() error { return nil } + func TestSortWithoutLabels(t *testing.T) { for _, tcase := range []struct { input []*storepb.SeriesResponse @@ -341,6 +359,7 @@ func BenchmarkKWayMerge(b *testing.B) { for j := 0; j < 1000; j++ { respSets = append(respSets, &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: []*storepb.SeriesResponse{ storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3")), diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index e9e3889465..215dc9515b 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -6,6 +6,9 @@ package store import ( "context" "fmt" + "strings" + + "github.com/pkg/errors" "math" "math/rand" @@ -19,7 +22,6 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" @@ -2213,68 +2215,126 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { defer custom.TolerantVerifyLeak(t) - clients := []Client{ - &storetestutil.TestClient{ - StoreClient: &mockedStoreAPI{ - RespSeries: []*storepb.SeriesResponse{ - // Ensure more than 10 (internal respCh channel). - storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), - }, - }, - MinTime: math.MinInt64, - MaxTime: math.MaxInt64, - }, - &storetestutil.TestClient{ - StoreClient: &mockedStoreAPI{ - RespSeries: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), - storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), - }, - }, - MinTime: math.MinInt64, - MaxTime: math.MaxInt64, - }, - } - logger := log.NewNopLogger() - p := &ProxyStore{ - logger: logger, - stores: func() []Client { return clients }, - metrics: newProxyStoreMetrics(nil), - responseTimeout: 0, - retrievalStrategy: EagerRetrieval, - tsdbSelector: DefaultSelector, + + for _, respStrategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} { + t.Run(fmt.Sprintf("strategy=%v", respStrategy), func(t *testing.T) { + t.Run("failing send", func(t *testing.T) { + clients := []Client{ + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + // Ensure more than 10 (internal respCh channel). + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + }, + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + }, + } + + p := &ProxyStore{ + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 50 * time.Millisecond, + retrievalStrategy: respStrategy, + tsdbSelector: DefaultSelector, + } + + ctx, cancel := context.WithCancel(context.Background()) + // We mimic failing series server, but practically context cancel will do the same. + testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ + ctx: ctx, + send: func(*storepb.SeriesResponse) error { + cancel() + return ctx.Err() + }, + })) + testutil.NotOk(t, ctx.Err()) + }) + + t.Run("client timeout", func(t *testing.T) { + clients := []Client{ + &storetestutil.TestClient{ + StoreClient: storepb.ServerAsClient(&storeServerStub{ + delay: 50 * time.Millisecond, + responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }), + MinTime: math.MinInt64, + MaxTime: math.MaxInt64, + }, + } + + p := &ProxyStore{ + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 50 * time.Millisecond, + retrievalStrategy: respStrategy, + tsdbSelector: DefaultSelector, + } + + ctx := context.Background() + err := p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ + ctx: ctx, + send: func(*storepb.SeriesResponse) error { + return nil + }, + }) + + testutil.Assert(t, strings.Contains(err.Error(), context.Canceled.Error())) + }) + }) } +} - t.Run("failing send", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - // We mimic failing series server, but practically context cancel will do the same. - testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ - ctx: ctx, - send: func(*storepb.SeriesResponse) error { - cancel() - return ctx.Err() - }, - })) - testutil.NotOk(t, ctx.Err()) - }) +type storeServerStub struct { + storepb.StoreServer + + delay time.Duration + responses []*storepb.SeriesResponse +} + +func (m *storeServerStub) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error { + for _, r := range m.responses { + <-time.After(m.delay) + if err := server.Send(r); err != nil { + return err + } + } + return nil } func TestProxyStore_storeMatchMetadata(t *testing.T) { @@ -2394,6 +2454,7 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { h := NewResponseDeduplicator(NewProxyResponseLoserTree( &eagerRespSet{ closeSeries: func() {}, + cl: nopClientSendCloser{}, wg: &sync.WaitGroup{}, bufferedResponses: tcase.responses, }, diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index a5b792bca1..0c3e7641ba 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -55,6 +55,9 @@ func (c *inProcessClient) Recv() (*SeriesResponse, error) { return nil, err } if !ok { + if c.ctx.Err() != nil { + return nil, c.ctx.Err() + } return nil, io.EOF } return resp, err diff --git a/pkg/store/storepb/testutil/store_series_client.go b/pkg/store/storepb/testutil/store_series_client.go index 2580692ca2..e370f7c984 100644 --- a/pkg/store/storepb/testutil/store_series_client.go +++ b/pkg/store/storepb/testutil/store_series_client.go @@ -50,3 +50,4 @@ func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) { func (c *StoreSeriesClient) Context() context.Context { return c.Ctx } +func (c *StoreSeriesClient) CloseSend() error { return nil }