Skip to content

Commit

Permalink
Fix coroutine leak (#7821)
Browse files Browse the repository at this point in the history
* Fix coroutine leak

The in-process client uses a pull based iterator which needs
to be closed, otherwise it will leak the underlying coroutine.
When this happens, the tsdb reader will remain open which blocks head
compaction indefinitely.

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Fix race condition

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Fix CHANGELOG

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Improve tests

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Fix blockSeriesClient

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Fix unit test

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

* Fix another unit test

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>

---------

Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski authored Oct 15, 2024
1 parent 175baf1 commit 9c925bf
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7643](https://github.com/thanos-io/thanos/pull/7643) Receive: fix thanos_receive_write_{timeseries,samples} stats
- [#7644](https://github.com/thanos-io/thanos/pull/7644) fix(ui): add null check to find overlapping blocks logic
- [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.

### Added
- [#7763](https://github.com/thanos-io/thanos/pull/7763) Ruler: use native histograms for client latency metrics.
Expand Down
7 changes: 5 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,8 +1574,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
tenant,
)

defer blockClient.Close()

g.Go(func() error {

span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{
Expand Down Expand Up @@ -3356,6 +3354,11 @@ func (r *bucketIndexReader) Close() error {
return nil
}

func (b *blockSeriesClient) CloseSend() error {
b.Close()
return nil
}

// LookupLabelsSymbols allows populates label set strings from symbolized label set.
func (r *bucketIndexReader) LookupLabelsSymbols(ctx context.Context, symbolized []symbolizedLabel, b *labels.Builder) error {
b.Reset(labels.EmptyLabels())
Expand Down
20 changes: 14 additions & 6 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} {
type lazyRespSet struct {
// Generic parameters.
span opentracing.Span
cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
storeName string
storeLabelSets []labels.Labels
Expand Down Expand Up @@ -318,6 +319,7 @@ func newLazyRespSet(
frameTimeout: frameTimeout,
storeName: storeName,
storeLabelSets: storeLabelSets,
cl: cl,
closeSeries: closeSeries,
span: span,
dataOrFinishEvent: dataAvailable,
Expand Down Expand Up @@ -446,8 +448,10 @@ func newAsyncRespSet(
emptyStreamResponses prometheus.Counter,
) (respSet, error) {

var span opentracing.Span
var closeSeries context.CancelFunc
var (
span opentracing.Span
cancel context.CancelFunc
)

storeID, storeAddr, isLocalStore := storeInfo(st)
seriesCtx := grpc_opentracing.ClientAddContextTags(ctx, opentracing.Tags{
Expand All @@ -459,7 +463,7 @@ func newAsyncRespSet(
"store.addr": storeAddr,
})

seriesCtx, closeSeries = context.WithCancel(seriesCtx)
seriesCtx, cancel = context.WithCancel(seriesCtx)

shardMatcher := shardInfo.Matcher(buffers)

Expand All @@ -474,7 +478,7 @@ func newAsyncRespSet(

span.SetTag("err", err.Error())
span.Finish()
closeSeries()
cancel()
return nil, err
}

Expand All @@ -497,7 +501,7 @@ func newAsyncRespSet(
frameTimeout,
st.String(),
st.LabelSets(),
closeSeries,
cancel,
cl,
shardMatcher,
applySharding,
Expand All @@ -509,7 +513,7 @@ func newAsyncRespSet(
frameTimeout,
st.String(),
st.LabelSets(),
closeSeries,
cancel,
cl,
shardMatcher,
applySharding,
Expand All @@ -530,6 +534,7 @@ func (l *lazyRespSet) Close() {
l.dataOrFinishEvent.Signal()

l.shardMatcher.Close()
_ = l.cl.CloseSend()
}

// eagerRespSet is a SeriesSet that blocks until all data is retrieved from
Expand All @@ -539,6 +544,7 @@ type eagerRespSet struct {
// Generic parameters.
span opentracing.Span

cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
frameTimeout time.Duration

Expand Down Expand Up @@ -569,6 +575,7 @@ func newEagerRespSet(
) respSet {
ret := &eagerRespSet{
span: span,
cl: cl,
closeSeries: closeSeries,
frameTimeout: frameTimeout,
bufferedResponses: []*storepb.SeriesResponse{},
Expand Down Expand Up @@ -717,6 +724,7 @@ func (l *eagerRespSet) Close() {
l.closeSeries()
}
l.shardMatcher.Close()
_ = l.cl.CloseSend()
}

func (l *eagerRespSet) At() *storepb.SeriesResponse {
Expand Down
19 changes: 19 additions & 0 deletions pkg/store/proxy_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
Expand All @@ -43,6 +44,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")),
Expand All @@ -62,6 +64,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
Expand All @@ -71,6 +74,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")),
Expand All @@ -91,6 +95,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
Expand All @@ -100,6 +105,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
Expand All @@ -120,6 +126,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
Expand All @@ -129,6 +136,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")),
Expand All @@ -149,6 +157,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
Expand All @@ -158,6 +167,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
Expand All @@ -178,6 +188,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
Expand All @@ -188,6 +199,7 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
Expand Down Expand Up @@ -219,6 +231,12 @@ func TestProxyResponseTreeSort(t *testing.T) {
}
}

type nopClientSendCloser struct {
storepb.Store_SeriesClient
}

func (c nopClientSendCloser) CloseSend() error { return nil }

func TestSortWithoutLabels(t *testing.T) {
for _, tcase := range []struct {
input []*storepb.SeriesResponse
Expand Down Expand Up @@ -341,6 +359,7 @@ func BenchmarkKWayMerge(b *testing.B) {
for j := 0; j < 1000; j++ {
respSets = append(respSets, &eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3")),
Expand Down
Loading

0 comments on commit 9c925bf

Please sign in to comment.