Skip to content

Commit

Permalink
Revert "Fix coroutine leak (#7821)"
Browse files Browse the repository at this point in the history
This reverts commit 9c925bf.
  • Loading branch information
yeya24 committed Nov 5, 2024
1 parent f9027d9 commit 991c3bd
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 164 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7644](https://github.com/thanos-io/thanos/pull/7644) fix(ui): add null check to find overlapping blocks logic
- [#7814](https://github.com/thanos-io/thanos/pull/7814) Store: label_values: if matchers contain **name**=="something", do not add <labelname> != "" to fetch less postings.
- [#7679](https://github.com/thanos-io/thanos/pull/7679) Query: respect store.limit.* flags when evaluating queries
- [#7821](https://github.com/thanos-io/thanos/pull/7679) Query/Receive: Fix coroutine leak introduced in https://github.com/thanos-io/thanos/pull/7796.
- [#7843](https://github.com/thanos-io/thanos/pull/7843) Query Frontend: fix slow query logging for non-query endpoints.
- [#7852](https://github.com/thanos-io/thanos/pull/7852) Query Frontend: pass "stats" parameter forward to queriers and fix Prometheus stats merging.

Expand Down
7 changes: 2 additions & 5 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
tenant,
)

defer blockClient.Close()

g.Go(func() error {

span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{
Expand Down Expand Up @@ -3381,11 +3383,6 @@ func (r *bucketIndexReader) Close() error {
return nil
}

func (b *blockSeriesClient) CloseSend() error {
b.Close()
return nil
}

// LookupLabelsSymbols allows populates label set strings from symbolized label set.
func (r *bucketIndexReader) LookupLabelsSymbols(ctx context.Context, symbolized []symbolizedLabel, b *labels.Builder) error {
b.Reset(labels.EmptyLabels())
Expand Down
20 changes: 6 additions & 14 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} {
type lazyRespSet struct {
// Generic parameters.
span opentracing.Span
cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
storeName string
storeLabelSets []labels.Labels
Expand Down Expand Up @@ -319,7 +318,6 @@ func newLazyRespSet(
frameTimeout: frameTimeout,
storeName: storeName,
storeLabelSets: storeLabelSets,
cl: cl,
closeSeries: closeSeries,
span: span,
dataOrFinishEvent: dataAvailable,
Expand Down Expand Up @@ -448,10 +446,8 @@ func newAsyncRespSet(
emptyStreamResponses prometheus.Counter,
) (respSet, error) {

var (
span opentracing.Span
cancel context.CancelFunc
)
var span opentracing.Span
var closeSeries context.CancelFunc

storeID, storeAddr, isLocalStore := storeInfo(st)
seriesCtx := grpc_opentracing.ClientAddContextTags(ctx, opentracing.Tags{
Expand All @@ -463,7 +459,7 @@ func newAsyncRespSet(
"store.addr": storeAddr,
})

seriesCtx, cancel = context.WithCancel(seriesCtx)
seriesCtx, closeSeries = context.WithCancel(seriesCtx)

shardMatcher := shardInfo.Matcher(buffers)

Expand All @@ -478,7 +474,7 @@ func newAsyncRespSet(

span.SetTag("err", err.Error())
span.Finish()
cancel()
closeSeries()
return nil, err
}

Expand All @@ -501,7 +497,7 @@ func newAsyncRespSet(
frameTimeout,
st.String(),
st.LabelSets(),
cancel,
closeSeries,
cl,
shardMatcher,
applySharding,
Expand All @@ -513,7 +509,7 @@ func newAsyncRespSet(
frameTimeout,
st.String(),
st.LabelSets(),
cancel,
closeSeries,
cl,
shardMatcher,
applySharding,
Expand All @@ -534,7 +530,6 @@ func (l *lazyRespSet) Close() {
l.dataOrFinishEvent.Signal()

l.shardMatcher.Close()
_ = l.cl.CloseSend()
}

// eagerRespSet is a SeriesSet that blocks until all data is retrieved from
Expand All @@ -544,7 +539,6 @@ type eagerRespSet struct {
// Generic parameters.
span opentracing.Span

cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
frameTimeout time.Duration

Expand Down Expand Up @@ -575,7 +569,6 @@ func newEagerRespSet(
) respSet {
ret := &eagerRespSet{
span: span,
cl: cl,
closeSeries: closeSeries,
frameTimeout: frameTimeout,
bufferedResponses: []*storepb.SeriesResponse{},
Expand Down Expand Up @@ -724,7 +717,6 @@ func (l *eagerRespSet) Close() {
l.closeSeries()
}
l.shardMatcher.Close()
_ = l.cl.CloseSend()
}

func (l *eagerRespSet) At() *storepb.SeriesResponse {
Expand Down
19 changes: 0 additions & 19 deletions pkg/store/proxy_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")),
Expand All @@ -48,7 +47,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", "e", "5")),
Expand All @@ -68,7 +66,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
Expand All @@ -78,7 +75,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("d", "4", "e", "5")),
Expand All @@ -99,7 +95,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
Expand All @@ -109,7 +104,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
Expand All @@ -130,7 +124,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "c", "3")),
Expand All @@ -140,7 +133,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "1", "c", "3")),
Expand All @@ -161,7 +153,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext2", "9")),
Expand All @@ -171,7 +162,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "2", "ext1", "5", "ext2", "9")),
Expand All @@ -192,7 +182,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
input: []respSet{
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
Expand All @@ -203,7 +192,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
},
&eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labelsFromStrings("cluster", "beam-platform", "instance", "10.70.13.3:15692", "prometheus", "telemetry/observe-prometheus", "receive", "true", "tenant_id", "default-tenant")),
Expand Down Expand Up @@ -235,12 +223,6 @@ func TestProxyResponseTreeSort(t *testing.T) {
}
}

type nopClientSendCloser struct {
storepb.Store_SeriesClient
}

func (c nopClientSendCloser) CloseSend() error { return nil }

func TestSortWithoutLabels(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -365,7 +347,6 @@ func BenchmarkKWayMerge(b *testing.B) {
for j := 0; j < 1000; j++ {
respSets = append(respSets, &eagerRespSet{
closeSeries: func() {},
cl: nopClientSendCloser{},
wg: &sync.WaitGroup{},
bufferedResponses: []*storepb.SeriesResponse{
storeSeriesResponse(b, labelsFromStrings("a", "1", "b", fmt.Sprintf("replica-%d", j), "c", "3")),
Expand Down
Loading

0 comments on commit 991c3bd

Please sign in to comment.