Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix coroutine leak #7821

Merged
merged 7 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7643](https://github.com/thanos-io/thanos/pull/7643) Receive: fix thanos_receive_write_{timeseries,samples} stats
- [#7644](https://github.com/thanos-io/thanos/pull/7644) fix(ui): add null check to find overlapping blocks logic
- [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.

### Added
- [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics.
Expand Down
7 changes: 5 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,8 +1574,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
tenant,
)

defer blockClient.Close()

g.Go(func() error {

span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{
Expand Down Expand Up @@ -3356,6 +3354,11 @@ func (r *bucketIndexReader) Close() error {
return nil
}

func (b *blockSeriesClient) CloseSend() error {
b.Close()
return nil
}

// LookupLabelsSymbols allows populates label set strings from symbolized label set.
func (r *bucketIndexReader) LookupLabelsSymbols(ctx context.Context, symbolized []symbolizedLabel, b *labels.Builder) error {
b.Reset(labels.EmptyLabels())
Expand Down
20 changes: 14 additions & 6 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} {
type lazyRespSet struct {
// Generic parameters.
span opentracing.Span
cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
storeName string
storeLabelSets []labels.Labels
Expand Down Expand Up @@ -318,6 +319,7 @@ func newLazyRespSet(
frameTimeout: frameTimeout,
storeName: storeName,
storeLabelSets: storeLabelSets,
cl: cl,
closeSeries: closeSeries,
span: span,
dataOrFinishEvent: dataAvailable,
Expand Down Expand Up @@ -446,8 +448,10 @@ func newAsyncRespSet(
emptyStreamResponses prometheus.Counter,
) (respSet, error) {

var span opentracing.Span
var closeSeries context.CancelFunc
var (
span opentracing.Span
cancel context.CancelFunc
)

storeID, storeAddr, isLocalStore := storeInfo(st)
seriesCtx := grpc_opentracing.ClientAddContextTags(ctx, opentracing.Tags{
Expand All @@ -459,7 +463,7 @@ func newAsyncRespSet(
"store.addr": storeAddr,
})

seriesCtx, closeSeries = context.WithCancel(seriesCtx)
seriesCtx, cancel = context.WithCancel(seriesCtx)

shardMatcher := shardInfo.Matcher(buffers)

Expand All @@ -474,7 +478,7 @@ func newAsyncRespSet(

span.SetTag("err", err.Error())
span.Finish()
closeSeries()
cancel()
return nil, err
}

Expand All @@ -497,7 +501,7 @@ func newAsyncRespSet(
frameTimeout,
st.String(),
st.LabelSets(),
closeSeries,
cancel,
cl,
shardMatcher,
applySharding,
Expand All @@ -509,7 +513,7 @@ func newAsyncRespSet(
frameTimeout,
st.String(),
st.LabelSets(),
closeSeries,
cancel,
cl,
shardMatcher,
applySharding,
Expand All @@ -530,6 +534,7 @@ func (l *lazyRespSet) Close() {
l.dataOrFinishEvent.Signal()

l.shardMatcher.Close()
_ = l.cl.CloseSend()
}

// eagerRespSet is a SeriesSet that blocks until all data is retrieved from
Expand All @@ -539,6 +544,7 @@ type eagerRespSet struct {
// Generic parameters.
span opentracing.Span

cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
frameTimeout time.Duration

Expand Down Expand Up @@ -569,6 +575,7 @@ func newEagerRespSet(
) respSet {
ret := &eagerRespSet{
span: span,
cl: cl,
closeSeries: closeSeries,
frameTimeout: frameTimeout,
bufferedResponses: []*storepb.SeriesResponse{},
Expand Down Expand Up @@ -717,6 +724,7 @@ func (l *eagerRespSet) Close() {
l.closeSeries()
}
l.shardMatcher.Close()
_ = l.cl.CloseSend()
}

func (l *eagerRespSet) At() *storepb.SeriesResponse {
Expand Down
180 changes: 120 additions & 60 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package store
import (
"context"
"fmt"
"strings"

"github.com/pkg/errors"

"math"
"math/rand"
Expand All @@ -19,7 +22,6 @@ import (
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
Expand Down Expand Up @@ -2213,68 +2215,126 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
defer custom.TolerantVerifyLeak(t)

clients := []Client{
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
// Ensure more than 10 (internal respCh channel).
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
}

logger := log.NewNopLogger()
p := &ProxyStore{
logger: logger,
stores: func() []Client { return clients },
metrics: newProxyStoreMetrics(nil),
responseTimeout: 0,
retrievalStrategy: EagerRetrieval,
tsdbSelector: DefaultSelector,

for _, respStrategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
t.Run(fmt.Sprintf("strategy=%v", respStrategy), func(t *testing.T) {
t.Run("failing send", func(t *testing.T) {
clients := []Client{
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
// Ensure more than 10 (internal respCh channel).
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
}

p := &ProxyStore{
logger: logger,
stores: func() []Client { return clients },
metrics: newProxyStoreMetrics(nil),
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: respStrategy,
tsdbSelector: DefaultSelector,
}

ctx, cancel := context.WithCancel(context.Background())
// We mimic failing series server, but practically context cancel will do the same.
testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
ctx: ctx,
send: func(*storepb.SeriesResponse) error {
cancel()
return ctx.Err()
},
}))
testutil.NotOk(t, ctx.Err())
})

t.Run("client timeout", func(t *testing.T) {
clients := []Client{
&storetestutil.TestClient{
StoreClient: storepb.ServerAsClient(&storeServerStub{
delay: 50 * time.Millisecond,
responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
}),
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
}

p := &ProxyStore{
logger: logger,
stores: func() []Client { return clients },
metrics: newProxyStoreMetrics(nil),
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: respStrategy,
tsdbSelector: DefaultSelector,
}

ctx := context.Background()
err := p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
ctx: ctx,
send: func(*storepb.SeriesResponse) error {
return nil
},
})

testutil.Assert(t, strings.Contains(err.Error(), context.Canceled.Error()))
})
})
}
}

t.Run("failing send", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// We mimic failing series server, but practically context cancel will do the same.
testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
ctx: ctx,
send: func(*storepb.SeriesResponse) error {
cancel()
return ctx.Err()
},
}))
testutil.NotOk(t, ctx.Err())
})
type storeServerStub struct {
storepb.StoreServer

delay time.Duration
responses []*storepb.SeriesResponse
}

func (m *storeServerStub) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error {
for _, r := range m.responses {
<-time.After(m.delay)
if err := server.Send(r); err != nil {
return err
}
}
return nil
}

func TestProxyStore_storeMatchMetadata(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/store/storepb/inprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (c *inProcessClient) Recv() (*SeriesResponse, error) {
return nil, err
}
if !ok {
if c.ctx.Err() != nil {
return nil, c.ctx.Err()
}
return nil, io.EOF
}
return resp, err
Expand Down
1 change: 1 addition & 0 deletions pkg/store/storepb/testutil/store_series_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,4 @@ func (c *StoreSeriesClient) Recv() (*storepb.SeriesResponse, error) {
func (c *StoreSeriesClient) Context() context.Context {
return c.Ctx
}
func (c *StoreSeriesClient) CloseSend() error { return nil }
Loading