Skip to content

Commit

Permalink
Fix race condition
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Oct 14, 2024
1 parent 5ed48cb commit 3dbc77f
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 78 deletions.
2 changes: 0 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,8 +1574,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
tenant,
)

defer blockClient.Close()

g.Go(func() error {

span, _ := tracing.StartSpan(gctx, "bucket_store_block_series", tracing.Tags{
Expand Down
17 changes: 8 additions & 9 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} {
type lazyRespSet struct {
// Generic parameters.
span opentracing.Span
cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
storeName string
storeLabelSets []labels.Labels
Expand Down Expand Up @@ -318,6 +319,7 @@ func newLazyRespSet(
frameTimeout: frameTimeout,
storeName: storeName,
storeLabelSets: storeLabelSets,
cl: cl,
closeSeries: closeSeries,
span: span,
dataOrFinishEvent: dataAvailable,
Expand Down Expand Up @@ -492,21 +494,14 @@ func newAsyncRespSet(
}
}

closeSeries := func() {
cancel()
err := cl.CloseSend()
if err != nil {
level.Warn(logger).Log("msg", "detected close error", "err", err.Error())
}
}
switch retrievalStrategy {
case LazyRetrieval:
return newLazyRespSet(
span,
frameTimeout,
st.String(),
st.LabelSets(),
closeSeries,
cancel,
cl,
shardMatcher,
applySharding,
Expand All @@ -518,7 +513,7 @@ func newAsyncRespSet(
frameTimeout,
st.String(),
st.LabelSets(),
closeSeries,
cancel,
cl,
shardMatcher,
applySharding,
Expand All @@ -539,6 +534,7 @@ func (l *lazyRespSet) Close() {
l.dataOrFinishEvent.Signal()

l.shardMatcher.Close()
_ = l.cl.CloseSend()
}

// eagerRespSet is a SeriesSet that blocks until all data is retrieved from
Expand All @@ -548,6 +544,7 @@ type eagerRespSet struct {
// Generic parameters.
span opentracing.Span

cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
frameTimeout time.Duration

Expand Down Expand Up @@ -578,6 +575,7 @@ func newEagerRespSet(
) respSet {
ret := &eagerRespSet{
span: span,
cl: cl,
closeSeries: closeSeries,
frameTimeout: frameTimeout,
bufferedResponses: []*storepb.SeriesResponse{},
Expand Down Expand Up @@ -726,6 +724,7 @@ func (l *eagerRespSet) Close() {
l.closeSeries()
}
l.shardMatcher.Close()
_ = l.cl.CloseSend()
}

func (l *eagerRespSet) At() *storepb.SeriesResponse {
Expand Down
156 changes: 89 additions & 67 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package store
import (
"context"
"fmt"
"strings"

"github.com/pkg/errors"

"math"
"math/rand"
Expand All @@ -19,7 +22,6 @@ import (
"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
Expand Down Expand Up @@ -2213,69 +2215,58 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
defer custom.TolerantVerifyLeak(t)

clients := []Client{
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
// Ensure more than 10 (internal respCh channel).
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
&storetestutil.TestClient{
StoreClient: storepb.ServerAsClient(&storeServerStub{
delay: 50 * time.Millisecond,
responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
}),
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
}

logger := log.NewNopLogger()
p := &ProxyStore{
logger: logger,
stores: func() []Client { return clients },
metrics: newProxyStoreMetrics(nil),
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: EagerRetrieval,
tsdbSelector: DefaultSelector,
}

t.Run("failing send", func(t *testing.T) {
clients := []Client{
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
// Ensure more than 10 (internal respCh channel).
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
}

p := &ProxyStore{
logger: logger,
stores: func() []Client { return clients },
metrics: newProxyStoreMetrics(nil),
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: EagerRetrieval,
tsdbSelector: DefaultSelector,
}

ctx, cancel := context.WithCancel(context.Background())
// We mimic failing series server, but practically context cancel will do the same.
testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
Expand All @@ -2287,14 +2278,45 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
}))
testutil.NotOk(t, ctx.Err())
})

t.Run("client timeout", func(t *testing.T) {
ctx := context.Background()
testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
ctx: ctx,
send: func(*storepb.SeriesResponse) error {
return nil
clients := []Client{
&storetestutil.TestClient{
StoreClient: storepb.ServerAsClient(&storeServerStub{
delay: 50 * time.Millisecond,
responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
}),
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
}))
}

for _, respStrategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
t.Run(fmt.Sprintf("strategy=%v", respStrategy), func(t *testing.T) {
p := &ProxyStore{
logger: logger,
stores: func() []Client { return clients },
metrics: newProxyStoreMetrics(nil),
responseTimeout: 50 * time.Millisecond,
retrievalStrategy: respStrategy,
tsdbSelector: DefaultSelector,
}

ctx := context.Background()
err := p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
ctx: ctx,
send: func(*storepb.SeriesResponse) error {
return nil
},
})

testutil.Assert(t, strings.Contains(err.Error(), context.Canceled.Error()))
})
}
})
}

Expand Down

0 comments on commit 3dbc77f

Please sign in to comment.