From 2f9d6a0f5c5c452cb2c1e8df5ddeabbb3649df18 Mon Sep 17 00:00:00 2001 From: Jimmiehan Date: Sat, 30 Oct 2021 23:41:56 +0800 Subject: [PATCH] Query: Fix metadataServer/rulesServer/targetsServer data race (#4811) Signed-off-by: Jimmiehan --- pkg/metadata/metadata.go | 6 ++++ pkg/metadata/proxy_test.go | 72 ++++++++++++++++++++++++++++++++++++++ pkg/rules/proxy_test.go | 24 +++++++++++++ pkg/rules/rules.go | 6 ++++ pkg/targets/proxy_test.go | 72 ++++++++++++++++++++++++++++++++++++++ pkg/targets/targets.go | 6 ++++ 6 files changed, 186 insertions(+) create mode 100644 pkg/metadata/proxy_test.go create mode 100644 pkg/targets/proxy_test.go diff --git a/pkg/metadata/metadata.go b/pkg/metadata/metadata.go index 4074301f7d8..f3aae015a87 100644 --- a/pkg/metadata/metadata.go +++ b/pkg/metadata/metadata.go @@ -5,6 +5,7 @@ package metadata import ( "context" + "sync" "github.com/pkg/errors" "github.com/prometheus/prometheus/storage" @@ -67,10 +68,13 @@ type metadataServer struct { warnings []error metadataMap map[string][]metadatapb.Meta + mu sync.Mutex } func (srv *metadataServer) Send(res *metadatapb.MetricMetadataResponse) error { if res.GetWarning() != "" { + srv.mu.Lock() + defer srv.mu.Unlock() srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) return nil } @@ -84,6 +88,8 @@ func (srv *metadataServer) Send(res *metadatapb.MetricMetadataResponse) error { return nil } + srv.mu.Lock() + defer srv.mu.Unlock() for k, v := range res.GetMetadata().Metadata { if metadata, ok := srv.metadataMap[k]; !ok { // If limit is set and it is positive, we limit the size of the map. diff --git a/pkg/metadata/proxy_test.go b/pkg/metadata/proxy_test.go new file mode 100644 index 00000000000..0026edeb2d7 --- /dev/null +++ b/pkg/metadata/proxy_test.go @@ -0,0 +1,72 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package metadata + +import ( + "context" + "io" + "os" + "testing" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/metadata/metadatapb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "go.uber.org/atomic" + "google.golang.org/grpc" +) + +type testMetadataClient struct { + grpc.ClientStream + metadataErr, recvErr error + response *metadatapb.MetricMetadataResponse + sentResponse atomic.Bool +} + +func (t *testMetadataClient) String() string { + return "test" +} + +func (t *testMetadataClient) Recv() (*metadatapb.MetricMetadataResponse, error) { + if t.recvErr != nil { + return nil, t.recvErr + } + + if t.sentResponse.Load() { + return nil, io.EOF + } + t.sentResponse.Store(true) + + return t.response, nil +} + +func (t *testMetadataClient) MetricMetadata(ctx context.Context, in *metadatapb.MetricMetadataRequest, opts ...grpc.CallOption) (metadatapb.Metadata_MetricMetadataClient, error) { + return t, t.metadataErr +} + +var _ metadatapb.MetadataClient = &testMetadataClient{} + +// TestProxyDataRace find the concurrent data race bug ( go test -race -run TestProxyDataRace -v ). +func TestProxyDataRace(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + p := NewProxy(logger, func() []metadatapb.MetadataClient { + es := &testMetadataClient{ + recvErr: errors.New("err"), + } + size := 100 + endpoints := make([]metadatapb.MetadataClient, 0, size) + for i := 0; i < size; i++ { + endpoints = append(endpoints, es) + } + return endpoints + }) + req := &metadatapb.MetricMetadataRequest{ + Metric: `http_request_duration_bucket`, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + } + s := &metadataServer{ + ctx: context.Background(), + } + _ = p.MetricMetadata(req, s) +} diff --git a/pkg/rules/proxy_test.go b/pkg/rules/proxy_test.go index 54e112ef000..ae902e94736 100644 --- a/pkg/rules/proxy_test.go +++ b/pkg/rules/proxy_test.go @@ -251,3 +251,27 @@ func TestProxy(t *testing.T) { }) } } + +// TestProxyDataRace find the concurrent data race bug ( go test -race -run TestProxyDataRace -v ). +func TestProxyDataRace(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + p := NewProxy(logger, func() []rulespb.RulesClient { + es := &testRulesClient{ + recvErr: errors.New("err"), + } + size := 100 + endpoints := make([]rulespb.RulesClient, 0, size) + for i := 0; i < size; i++ { + endpoints = append(endpoints, es) + } + return endpoints + }) + req := &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALL, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + } + s := &rulesServer{ + ctx: context.Background(), + } + _ = p.Rules(req, s) +} diff --git a/pkg/rules/rules.go b/pkg/rules/rules.go index 82d68f1ed4f..1aa646c0c1c 100644 --- a/pkg/rules/rules.go +++ b/pkg/rules/rules.go @@ -6,6 +6,7 @@ package rules import ( "context" "sort" + "sync" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" @@ -153,10 +154,13 @@ type rulesServer struct { warnings []error groups []*rulespb.RuleGroup + mu sync.Mutex } func (srv *rulesServer) Send(res *rulespb.RulesResponse) error { if res.GetWarning() != "" { + srv.mu.Lock() + defer srv.mu.Unlock() srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) return nil } @@ -165,6 +169,8 @@ func (srv *rulesServer) Send(res *rulespb.RulesResponse) error { return errors.New("no group") } + srv.mu.Lock() + defer srv.mu.Unlock() srv.groups = append(srv.groups, res.GetGroup()) return nil } diff --git a/pkg/targets/proxy_test.go b/pkg/targets/proxy_test.go new file mode 100644 index 00000000000..56f0fc4d9d4 --- /dev/null +++ b/pkg/targets/proxy_test.go @@ -0,0 +1,72 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package targets + +import ( + "context" + "io" + "os" + "testing" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/targets/targetspb" + "google.golang.org/grpc" +) + +type testTargetsClient struct { + grpc.ClientStream + targetsErr, recvErr error + response *targetspb.TargetsResponse + sentResponse bool +} + +func (t *testTargetsClient) String() string { + return "test" +} + +func (t *testTargetsClient) Recv() (*targetspb.TargetsResponse, error) { + // A simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125. + if t.recvErr != nil { + return nil, t.recvErr + } + + if t.sentResponse { + return nil, io.EOF + } + t.sentResponse = true + + return t.response, nil +} + +func (t *testTargetsClient) Targets(ctx context.Context, in *targetspb.TargetsRequest, opts ...grpc.CallOption) (targetspb.Targets_TargetsClient, error) { + return t, t.targetsErr +} + +var _ targetspb.TargetsClient = &testTargetsClient{} + +// TestProxyDataRace find the concurrent data race bug ( go test -race -run TestProxyDataRace -v ). +func TestProxyDataRace(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stderr) + p := NewProxy(logger, func() []targetspb.TargetsClient { + es := &testTargetsClient{ + recvErr: errors.New("err"), + } + size := 100 + endpoints := make([]targetspb.TargetsClient, 0, size) + for i := 0; i < size; i++ { + endpoints = append(endpoints, es) + } + return endpoints + }) + req := &targetspb.TargetsRequest{ + State: targetspb.TargetsRequest_ANY, + PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + } + s := &targetsServer{ + ctx: context.Background(), + } + _ = p.Targets(req, s) +} diff --git a/pkg/targets/targets.go b/pkg/targets/targets.go index c62b74a7f36..9a1b4016c31 100644 --- a/pkg/targets/targets.go +++ b/pkg/targets/targets.go @@ -6,6 +6,7 @@ package targets import ( "context" "sort" + "sync" "github.com/pkg/errors" "github.com/prometheus/prometheus/storage" @@ -185,10 +186,13 @@ type targetsServer struct { warnings []error targets *targetspb.TargetDiscovery + mu sync.Mutex } func (srv *targetsServer) Send(res *targetspb.TargetsResponse) error { if res.GetWarning() != "" { + srv.mu.Lock() + defer srv.mu.Unlock() srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) return nil } @@ -196,6 +200,8 @@ func (srv *targetsServer) Send(res *targetspb.TargetsResponse) error { if res.GetTargets() == nil { return errors.New("no targets") } + srv.mu.Lock() + defer srv.mu.Unlock() srv.targets.ActiveTargets = append(srv.targets.ActiveTargets, res.GetTargets().ActiveTargets...) srv.targets.DroppedTargets = append(srv.targets.DroppedTargets, res.GetTargets().DroppedTargets...)