Skip to content

Commit

Permalink
Query: Fix metadataServer/rulesServer/targetsServer data race (thanos…
Browse files Browse the repository at this point in the history
…-io#4811)

Signed-off-by: Jimmiehan <hanjinming@outlook.com>
  • Loading branch information
hanjm authored Oct 30, 2021
1 parent 0586d1f commit 2f9d6a0
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package metadata

import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -67,10 +68,13 @@ type metadataServer struct {

warnings []error
metadataMap map[string][]metadatapb.Meta
mu sync.Mutex
}

func (srv *metadataServer) Send(res *metadatapb.MetricMetadataResponse) error {
if res.GetWarning() != "" {
srv.mu.Lock()
defer srv.mu.Unlock()
srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
return nil
}
Expand All @@ -84,6 +88,8 @@ func (srv *metadataServer) Send(res *metadatapb.MetricMetadataResponse) error {
return nil
}

srv.mu.Lock()
defer srv.mu.Unlock()
for k, v := range res.GetMetadata().Metadata {
if metadata, ok := srv.metadataMap[k]; !ok {
// If limit is set and it is positive, we limit the size of the map.
Expand Down
72 changes: 72 additions & 0 deletions pkg/metadata/proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package metadata

import (
"context"
"io"
"os"
"testing"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"go.uber.org/atomic"
"google.golang.org/grpc"
)

type testMetadataClient struct {
grpc.ClientStream
metadataErr, recvErr error
response *metadatapb.MetricMetadataResponse
sentResponse atomic.Bool
}

func (t *testMetadataClient) String() string {
return "test"
}

func (t *testMetadataClient) Recv() (*metadatapb.MetricMetadataResponse, error) {
if t.recvErr != nil {
return nil, t.recvErr
}

if t.sentResponse.Load() {
return nil, io.EOF
}
t.sentResponse.Store(true)

return t.response, nil
}

func (t *testMetadataClient) MetricMetadata(ctx context.Context, in *metadatapb.MetricMetadataRequest, opts ...grpc.CallOption) (metadatapb.Metadata_MetricMetadataClient, error) {
return t, t.metadataErr
}

var _ metadatapb.MetadataClient = &testMetadataClient{}

// TestProxyDataRace find the concurrent data race bug ( go test -race -run TestProxyDataRace -v ).
func TestProxyDataRace(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
p := NewProxy(logger, func() []metadatapb.MetadataClient {
es := &testMetadataClient{
recvErr: errors.New("err"),
}
size := 100
endpoints := make([]metadatapb.MetadataClient, 0, size)
for i := 0; i < size; i++ {
endpoints = append(endpoints, es)
}
return endpoints
})
req := &metadatapb.MetricMetadataRequest{
Metric: `http_request_duration_bucket`,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
}
s := &metadataServer{
ctx: context.Background(),
}
_ = p.MetricMetadata(req, s)
}
24 changes: 24 additions & 0 deletions pkg/rules/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,3 +251,27 @@ func TestProxy(t *testing.T) {
})
}
}

// TestProxyDataRace find the concurrent data race bug ( go test -race -run TestProxyDataRace -v ).
func TestProxyDataRace(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
p := NewProxy(logger, func() []rulespb.RulesClient {
es := &testRulesClient{
recvErr: errors.New("err"),
}
size := 100
endpoints := make([]rulespb.RulesClient, 0, size)
for i := 0; i < size; i++ {
endpoints = append(endpoints, es)
}
return endpoints
})
req := &rulespb.RulesRequest{
Type: rulespb.RulesRequest_ALL,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
}
s := &rulesServer{
ctx: context.Background(),
}
_ = p.Rules(req, s)
}
6 changes: 6 additions & 0 deletions pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package rules
import (
"context"
"sort"
"sync"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
Expand Down Expand Up @@ -153,10 +154,13 @@ type rulesServer struct {

warnings []error
groups []*rulespb.RuleGroup
mu sync.Mutex
}

func (srv *rulesServer) Send(res *rulespb.RulesResponse) error {
if res.GetWarning() != "" {
srv.mu.Lock()
defer srv.mu.Unlock()
srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
return nil
}
Expand All @@ -165,6 +169,8 @@ func (srv *rulesServer) Send(res *rulespb.RulesResponse) error {
return errors.New("no group")
}

srv.mu.Lock()
defer srv.mu.Unlock()
srv.groups = append(srv.groups, res.GetGroup())
return nil
}
Expand Down
72 changes: 72 additions & 0 deletions pkg/targets/proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package targets

import (
"context"
"io"
"os"
"testing"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
"google.golang.org/grpc"
)

type testTargetsClient struct {
grpc.ClientStream
targetsErr, recvErr error
response *targetspb.TargetsResponse
sentResponse bool
}

func (t *testTargetsClient) String() string {
return "test"
}

func (t *testTargetsClient) Recv() (*targetspb.TargetsResponse, error) {
// A simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125.
if t.recvErr != nil {
return nil, t.recvErr
}

if t.sentResponse {
return nil, io.EOF
}
t.sentResponse = true

return t.response, nil
}

func (t *testTargetsClient) Targets(ctx context.Context, in *targetspb.TargetsRequest, opts ...grpc.CallOption) (targetspb.Targets_TargetsClient, error) {
return t, t.targetsErr
}

var _ targetspb.TargetsClient = &testTargetsClient{}

// TestProxyDataRace find the concurrent data race bug ( go test -race -run TestProxyDataRace -v ).
func TestProxyDataRace(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
p := NewProxy(logger, func() []targetspb.TargetsClient {
es := &testTargetsClient{
recvErr: errors.New("err"),
}
size := 100
endpoints := make([]targetspb.TargetsClient, 0, size)
for i := 0; i < size; i++ {
endpoints = append(endpoints, es)
}
return endpoints
})
req := &targetspb.TargetsRequest{
State: targetspb.TargetsRequest_ANY,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
}
s := &targetsServer{
ctx: context.Background(),
}
_ = p.Targets(req, s)
}
6 changes: 6 additions & 0 deletions pkg/targets/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package targets
import (
"context"
"sort"
"sync"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -185,17 +186,22 @@ type targetsServer struct {

warnings []error
targets *targetspb.TargetDiscovery
mu sync.Mutex
}

func (srv *targetsServer) Send(res *targetspb.TargetsResponse) error {
if res.GetWarning() != "" {
srv.mu.Lock()
defer srv.mu.Unlock()
srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
return nil
}

if res.GetTargets() == nil {
return errors.New("no targets")
}
srv.mu.Lock()
defer srv.mu.Unlock()
srv.targets.ActiveTargets = append(srv.targets.ActiveTargets, res.GetTargets().ActiveTargets...)
srv.targets.DroppedTargets = append(srv.targets.DroppedTargets, res.GetTargets().DroppedTargets...)

Expand Down

0 comments on commit 2f9d6a0

Please sign in to comment.