Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: limit checker error at a finer granularity and auto increase max s…
Browse files Browse the repository at this point in the history
…ize (#1624)
  • Loading branch information
lance6716 authored May 13, 2021
1 parent 1b6b2af commit cd7b5d0
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 37 deletions.
55 changes: 43 additions & 12 deletions checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
continue
}

c.checkList = append(c.checkList, check.NewShardingTablesCheck(name, dbs, shardingSet, columnMapping, checkingShardID))
c.checkList = append(c.checkList, check.NewShardingTablesChecker(name, dbs, shardingSet, columnMapping, checkingShardID))
}
}

Expand Down Expand Up @@ -262,24 +262,55 @@ func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
errs = append(errs, unit.NewProcessError(err))
} else if !result.Summary.Passed {
errs = append(errs, unit.NewProcessError(errors.New("check was failed, please see detail")))
warnLeft, errLeft := c.warnCnt, c.errCnt

// remove success result if not pass
results := result.Results[:0]
var warnCnt int64 = 0
var errCnt int64 = 0
for _, r := range result.Results {
switch r.State {
case check.StateFailure:
if errCnt < c.errCnt {
errCnt++
if r.State == check.StateSuccess {
continue
}

// handle results without r.Errors
if len(r.Errors) == 0 {
switch r.State {
case check.StateWarning:
if warnLeft == 0 {
continue
}
warnLeft--
results = append(results, r)
}
case check.StateWarning:
if warnCnt < c.warnCnt {
warnCnt++
case check.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
results = append(results, r)
}
default:
continue
}

subErrors := make([]*check.Error, 0, len(r.Errors))
for _, e := range r.Errors {
switch e.Severity {
case check.StateWarning:
if warnLeft == 0 {
continue
}
warnLeft--
subErrors = append(subErrors, e)
case check.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
subErrors = append(subErrors, e)
}
}
// skip display an empty Result
if len(subErrors) > 0 {
r.Errors = subErrors
results = append(results, r)
}
}
result.Results = results
Expand Down
76 changes: 55 additions & 21 deletions dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,39 @@ import (
"fmt"
"io/ioutil"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"time"

"go.etcd.io/etcd/clientv3"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/spf13/cobra"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/log"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

var (
globalConfig = &Config{}
ctlClient = &CtlClient{}
// GlobalCtlClient is the globally used CtlClient in this package. Exposed to be used in test.
GlobalCtlClient = &CtlClient{}

re = regexp.MustCompile(`grpc: received message larger than max \((\d+) vs. (\d+)\)`)
)

// CtlClient used to get master client for dmctl.
Expand All @@ -53,7 +60,7 @@ type CtlClient struct {
tls *toolutils.TLS
etcdClient *clientv3.Client
conn *grpc.ClientConn
masterClient pb.MasterClient
MasterClient pb.MasterClient // exposed to be used in test
}

func (c *CtlClient) updateMasterClient() error {
Expand All @@ -75,19 +82,27 @@ func (c *CtlClient) updateMasterClient() error {
conn, err = grpc.Dial(utils.UnwrapScheme(endpoint), c.tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
if err == nil {
c.conn = conn
c.masterClient = pb.NewMasterClient(conn)
c.MasterClient = pb.NewMasterClient(conn)
return nil
}
}
return terror.ErrCtlGRPCCreateConn.AnnotateDelegate(err, "can't connect to %s", strings.Join(endpoints, ","))
}

func (c *CtlClient) sendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error {
func (c *CtlClient) sendRequest(
ctx context.Context,
reqName string,
req interface{},
respPointer interface{},
opts ...interface{}) error {
c.mu.RLock()
defer c.mu.RUnlock()

params := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(req)}
results := reflect.ValueOf(c.masterClient).MethodByName(reqName).Call(params)
for _, o := range opts {
params = append(params, reflect.ValueOf(o))
}
results := reflect.ValueOf(c.MasterClient).MethodByName(reqName).Call(params)

reflect.ValueOf(respPointer).Elem().Set(results[0])
errInterface := results[1].Interface()
Expand All @@ -100,19 +115,38 @@ func (c *CtlClient) sendRequest(ctx context.Context, reqName string, req interfa

// SendRequest send request to master.
func SendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error {
err := ctlClient.sendRequest(ctx, reqName, req, respPointer)
if err == nil || status.Code(err) != codes.Unavailable {
err := GlobalCtlClient.sendRequest(ctx, reqName, req, respPointer)
if err == nil {
return nil
}
var opts []interface{}
switch status.Code(err) {
case codes.ResourceExhausted:
matches := re.FindStringSubmatch(err.Error())
if len(matches) == 3 {
msgSize, err2 := strconv.Atoi(matches[1])
if err2 == nil {
log.L().Info("increase gRPC maximum message size", zap.Int("size", msgSize))
opts = append(opts, grpc.MaxCallRecvMsgSize(msgSize))
}
}
case codes.Unavailable:
default:
return err
}

failpoint.Inject("SkipUpdateMasterClient", func() {
failpoint.Goto("bypass")
})
// update master client
err = ctlClient.updateMasterClient()
err = GlobalCtlClient.updateMasterClient()
if err != nil {
return err
}
failpoint.Label("bypass")

// sendRequest again
return ctlClient.sendRequest(ctx, reqName, req, respPointer)
return GlobalCtlClient.sendRequest(ctx, reqName, req, respPointer, opts...)
}

// InitUtils inits necessary dmctl utils.
Expand Down Expand Up @@ -140,12 +174,12 @@ func InitClient(addr string, securityCfg config.Security) error {
return err
}

ctlClient = &CtlClient{
GlobalCtlClient = &CtlClient{
tls: tls,
etcdClient: etcdClient,
}

return ctlClient.updateMasterClient()
return GlobalCtlClient.updateMasterClient()
}

// GlobalConfig returns global dmctl config.
Expand Down Expand Up @@ -321,7 +355,7 @@ func SyncMasterEndpoints(ctx context.Context) {
clientURLs := []string{}
updateF := func() {
clientURLs = clientURLs[:0]
resp, err := ctlClient.etcdClient.MemberList(ctx)
resp, err := GlobalCtlClient.etcdClient.MemberList(ctx)
if err != nil {
return
}
Expand All @@ -332,7 +366,7 @@ func SyncMasterEndpoints(ctx context.Context) {
if utils.NonRepeatStringsEqual(clientURLs, lastClientUrls) {
return
}
ctlClient.etcdClient.SetEndpoints(clientURLs...)
GlobalCtlClient.etcdClient.SetEndpoints(clientURLs...)
lastClientUrls = make([]string, len(clientURLs))
copy(lastClientUrls, clientURLs)
}
Expand Down
11 changes: 11 additions & 0 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,17 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
resp2 *pb.StartTaskResponse
err2 error
)
failpoint.Inject("LongRPCResponse", func() {
var b strings.Builder
size := 5 * 1024 * 1024
b.Grow(size)
for i := 0; i < size; i++ {
b.WriteByte(0)
}
resp2 = &pb.StartTaskResponse{Msg: b.String()}
failpoint.Return(resp2, nil)
})

shouldRet := s.sharedLogic(ctx, req, &resp2, &err2)
if shouldRet {
return resp2, err2
Expand Down
35 changes: 34 additions & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"database/sql"
"fmt"
"io/ioutil"
"net"
"os"
"path/filepath"
"sort"
Expand All @@ -27,10 +28,11 @@ import (
"testing"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/DATA-DOG/go-sqlmock"
"github.com/golang/mock/gomock"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand All @@ -41,6 +43,7 @@ import (
"github.com/tikv/pd/pkg/tempurl"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"
"google.golang.org/grpc"

"github.com/pingcap/dm/checker"
"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -1953,6 +1956,36 @@ func (t *testMaster) subTaskStageMatch(c *check.C, s *scheduler.Scheduler, task,
}
}

func (t *testMaster) TestGRPCLongResponse(c *check.C) {
c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/master/LongRPCResponse", `return()`), check.IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/dm/master/LongRPCResponse")
c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/ctl/common/SkipUpdateMasterClient", `return()`), check.IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/dm/ctl/common/SkipUpdateMasterClient")

masterAddr := tempurl.Alloc()[len("http://"):]
lis, err := net.Listen("tcp", masterAddr)
c.Assert(err, check.IsNil)
defer lis.Close()
server := grpc.NewServer()
pb.RegisterMasterServer(server, &Server{})
//nolint:errcheck
go server.Serve(lis)

conn, err := grpc.Dial(utils.UnwrapScheme(masterAddr),
grpc.WithInsecure(),
grpc.WithBlock())
c.Assert(err, check.IsNil)
defer conn.Close()

common.GlobalCtlClient.MasterClient = pb.NewMasterClient(conn)
ctx := context.Background()
resp := &pb.StartTaskResponse{}
err = common.SendRequest(ctx, "StartTask", &pb.StartTaskRequest{}, &resp)
c.Assert(err, check.IsNil)
}

func mockRevelantWorkerClient(mockWorkerClient *pbmock.MockWorkerClient, taskName, sourceID string, masterReq interface{}) {
var expect pb.Stage
switch req := masterReq.(type) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210415081931-48e7f467fd74
github.com/pingcap/tidb v1.1.0-beta.0.20210330094614-60111e1c4b6f
github.com/pingcap/tidb-tools v5.0.0-rc.0.20210318094904-51a9e0c86386+incompatible
github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/rakyll/statik v0.1.6
github.com/soheilhy/cmux v0.1.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -847,8 +847,8 @@ github.com/pingcap/tidb-tools v4.0.1+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnw
github.com/pingcap/tidb-tools v4.0.5-0.20200820082341-afeaaaaaa153+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.5-0.20200820092506-34ea90c93237+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v5.0.0-rc.0.20210318094904-51a9e0c86386+incompatible h1:tgUbSovpQ12sd5W0eclXlzZh+hWhgLhvdqRy2BMiXyQ=
github.com/pingcap/tidb-tools v5.0.0-rc.0.20210318094904-51a9e0c86386+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible h1:jvsCYfIx30AnyQQxfzoCdzm8xRX0ZRJT3BxpFSKqrTo=
github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200604070248-508f03b0b342/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
Expand Down

0 comments on commit cd7b5d0

Please sign in to comment.