diff --git a/pkg/exemplars/exemplars.go b/pkg/exemplars/exemplars.go index fab73d48142..36b0ce9ffbf 100644 --- a/pkg/exemplars/exemplars.go +++ b/pkg/exemplars/exemplars.go @@ -6,6 +6,7 @@ package exemplars import ( "context" "sort" + "sync" "github.com/pkg/errors" "github.com/prometheus/prometheus/storage" @@ -37,10 +38,13 @@ type exemplarsServer struct { warnings []error data []*exemplarspb.ExemplarData + mu sync.Mutex } func (srv *exemplarsServer) Send(res *exemplarspb.ExemplarsResponse) error { if res.GetWarning() != "" { + srv.mu.Lock() + defer srv.mu.Unlock() srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) return nil } @@ -49,6 +53,8 @@ func (srv *exemplarsServer) Send(res *exemplarspb.ExemplarsResponse) error { return errors.New("empty exemplars data") } + srv.mu.Lock() + defer srv.mu.Unlock() srv.data = append(srv.data, res.GetData()) return nil } diff --git a/pkg/exemplars/proxy_test.go b/pkg/exemplars/proxy_test.go index ba73ec89072..7a252a7d5c1 100644 --- a/pkg/exemplars/proxy_test.go +++ b/pkg/exemplars/proxy_test.go @@ -8,6 +8,7 @@ import ( "io" "os" "reflect" + "sync" "testing" "github.com/go-kit/kit/log" @@ -17,6 +18,7 @@ import ( "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" + "go.uber.org/atomic" "google.golang.org/grpc" ) @@ -24,7 +26,7 @@ type testExemplarClient struct { grpc.ClientStream exemplarErr, recvErr error response *exemplarspb.ExemplarsResponse - sentResponse bool + sentResponse atomic.Bool } func (t *testExemplarClient) String() string { @@ -37,10 +39,10 @@ func (t *testExemplarClient) Recv() (*exemplarspb.ExemplarsResponse, error) { return nil, t.recvErr } - if t.sentResponse { + if t.sentResponse.Load() { return nil, io.EOF } - t.sentResponse = true + t.sentResponse.Store(true) return t.response, nil } @@ -55,6 +57,7 @@ type testExemplarServer struct { grpc.ServerStream sendErr error responses []*exemplarspb.ExemplarsResponse + mu sync.Mutex } func (t *testExemplarServer) String() string { @@ -65,6 +68,8 @@ func (t *testExemplarServer) Send(response *exemplarspb.ExemplarsResponse) error if t.sendErr != nil { return t.sendErr } + t.mu.Lock() + defer t.mu.Unlock() t.responses = append(t.responses, response) return nil } @@ -286,3 +291,30 @@ func TestProxy(t *testing.T) { }) } } + +// TestProxyDataRace find the concurrent data race bug ( go test -race -run TestProxyDataRace -v ). +func TestProxyDataRace(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + p := NewProxy(logger, func() []*exemplarspb.ExemplarStore { + es := &exemplarspb.ExemplarStore{ + ExemplarsClient: &testExemplarClient{ + recvErr: errors.New("err"), + }, + LabelSets: []labels.Labels{labels.FromMap(map[string]string{"cluster": "A"})}, + } + size := 100 + endpoints := make([]*exemplarspb.ExemplarStore, 0, size) + for i := 0; i < size; i++ { + endpoints = append(endpoints, es) + } + return endpoints + }, labels.FromMap(map[string]string{"query": "foo"})) + req := &exemplarspb.ExemplarsRequest{ + Query: `http_request_duration_bucket{query="foo"}`, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + } + s := &exemplarsServer{ + ctx: context.Background(), + } + _ = p.Exemplars(req, s) +}