From cd7b5d029d6774cce096e0579b498c3e06e5306e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 13 May 2021 16:23:39 +0800 Subject: [PATCH] *: limit checker error at a finer granularity and auto increase max size (#1624) --- checker/checker.go | 55 ++++++++++++++++++++++------- dm/ctl/common/util.go | 76 +++++++++++++++++++++++++++++----------- dm/master/server.go | 11 ++++++ dm/master/server_test.go | 35 +++++++++++++++++- go.mod | 2 +- go.sum | 4 +-- 6 files changed, 146 insertions(+), 37 deletions(-) diff --git a/checker/checker.go b/checker/checker.go index 72dbcf0b0a..fdd0ef4380 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -228,7 +228,7 @@ func (c *Checker) Init(ctx context.Context) (err error) { continue } - c.checkList = append(c.checkList, check.NewShardingTablesCheck(name, dbs, shardingSet, columnMapping, checkingShardID)) + c.checkList = append(c.checkList, check.NewShardingTablesChecker(name, dbs, shardingSet, columnMapping, checkingShardID)) } } @@ -262,24 +262,55 @@ func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) { errs = append(errs, unit.NewProcessError(err)) } else if !result.Summary.Passed { errs = append(errs, unit.NewProcessError(errors.New("check was failed, please see detail"))) + warnLeft, errLeft := c.warnCnt, c.errCnt // remove success result if not pass results := result.Results[:0] - var warnCnt int64 = 0 - var errCnt int64 = 0 for _, r := range result.Results { - switch r.State { - case check.StateFailure: - if errCnt < c.errCnt { - errCnt++ + if r.State == check.StateSuccess { + continue + } + + // handle results without r.Errors + if len(r.Errors) == 0 { + switch r.State { + case check.StateWarning: + if warnLeft == 0 { + continue + } + warnLeft-- results = append(results, r) - } - case check.StateWarning: - if warnCnt < c.warnCnt { - warnCnt++ + case check.StateFailure: + if errLeft == 0 { + continue + } + errLeft-- results = append(results, r) } - default: + continue + } + + subErrors := make([]*check.Error, 0, len(r.Errors)) + for _, e := range r.Errors { + switch e.Severity { + case check.StateWarning: + if warnLeft == 0 { + continue + } + warnLeft-- + subErrors = append(subErrors, e) + case check.StateFailure: + if errLeft == 0 { + continue + } + errLeft-- + subErrors = append(subErrors, e) + } + } + // skip display an empty Result + if len(subErrors) > 0 { + r.Errors = subErrors + results = append(results, r) } } result.Results = results diff --git a/dm/ctl/common/util.go b/dm/ctl/common/util.go index 1aea8c6439..25106a34ea 100644 --- a/dm/ctl/common/util.go +++ b/dm/ctl/common/util.go @@ -19,32 +19,39 @@ import ( "fmt" "io/ioutil" "reflect" + "regexp" + "strconv" "strings" "sync" "time" - "go.etcd.io/etcd/clientv3" - - "github.com/pingcap/dm/dm/config" - "github.com/pingcap/dm/dm/pb" - parserpkg "github.com/pingcap/dm/pkg/parser" - "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/dm/pkg/utils" - "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser" toolutils "github.com/pingcap/tidb-tools/pkg/utils" "github.com/spf13/cobra" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/log" + parserpkg "github.com/pingcap/dm/pkg/parser" + "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) var ( globalConfig = &Config{} - ctlClient = &CtlClient{} + // GlobalCtlClient is the globally used CtlClient in this package. Exposed to be used in test. + GlobalCtlClient = &CtlClient{} + + re = regexp.MustCompile(`grpc: received message larger than max \((\d+) vs. (\d+)\)`) ) // CtlClient used to get master client for dmctl. @@ -53,7 +60,7 @@ type CtlClient struct { tls *toolutils.TLS etcdClient *clientv3.Client conn *grpc.ClientConn - masterClient pb.MasterClient + MasterClient pb.MasterClient // exposed to be used in test } func (c *CtlClient) updateMasterClient() error { @@ -75,19 +82,27 @@ func (c *CtlClient) updateMasterClient() error { conn, err = grpc.Dial(utils.UnwrapScheme(endpoint), c.tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second)) if err == nil { c.conn = conn - c.masterClient = pb.NewMasterClient(conn) + c.MasterClient = pb.NewMasterClient(conn) return nil } } return terror.ErrCtlGRPCCreateConn.AnnotateDelegate(err, "can't connect to %s", strings.Join(endpoints, ",")) } -func (c *CtlClient) sendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error { +func (c *CtlClient) sendRequest( + ctx context.Context, + reqName string, + req interface{}, + respPointer interface{}, + opts ...interface{}) error { c.mu.RLock() defer c.mu.RUnlock() params := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(req)} - results := reflect.ValueOf(c.masterClient).MethodByName(reqName).Call(params) + for _, o := range opts { + params = append(params, reflect.ValueOf(o)) + } + results := reflect.ValueOf(c.MasterClient).MethodByName(reqName).Call(params) reflect.ValueOf(respPointer).Elem().Set(results[0]) errInterface := results[1].Interface() @@ -100,19 +115,38 @@ func (c *CtlClient) sendRequest(ctx context.Context, reqName string, req interfa // SendRequest send request to master. func SendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error { - err := ctlClient.sendRequest(ctx, reqName, req, respPointer) - if err == nil || status.Code(err) != codes.Unavailable { + err := GlobalCtlClient.sendRequest(ctx, reqName, req, respPointer) + if err == nil { + return nil + } + var opts []interface{} + switch status.Code(err) { + case codes.ResourceExhausted: + matches := re.FindStringSubmatch(err.Error()) + if len(matches) == 3 { + msgSize, err2 := strconv.Atoi(matches[1]) + if err2 == nil { + log.L().Info("increase gRPC maximum message size", zap.Int("size", msgSize)) + opts = append(opts, grpc.MaxCallRecvMsgSize(msgSize)) + } + } + case codes.Unavailable: + default: return err } + failpoint.Inject("SkipUpdateMasterClient", func() { + failpoint.Goto("bypass") + }) // update master client - err = ctlClient.updateMasterClient() + err = GlobalCtlClient.updateMasterClient() if err != nil { return err } + failpoint.Label("bypass") // sendRequest again - return ctlClient.sendRequest(ctx, reqName, req, respPointer) + return GlobalCtlClient.sendRequest(ctx, reqName, req, respPointer, opts...) } // InitUtils inits necessary dmctl utils. @@ -140,12 +174,12 @@ func InitClient(addr string, securityCfg config.Security) error { return err } - ctlClient = &CtlClient{ + GlobalCtlClient = &CtlClient{ tls: tls, etcdClient: etcdClient, } - return ctlClient.updateMasterClient() + return GlobalCtlClient.updateMasterClient() } // GlobalConfig returns global dmctl config. @@ -321,7 +355,7 @@ func SyncMasterEndpoints(ctx context.Context) { clientURLs := []string{} updateF := func() { clientURLs = clientURLs[:0] - resp, err := ctlClient.etcdClient.MemberList(ctx) + resp, err := GlobalCtlClient.etcdClient.MemberList(ctx) if err != nil { return } @@ -332,7 +366,7 @@ func SyncMasterEndpoints(ctx context.Context) { if utils.NonRepeatStringsEqual(clientURLs, lastClientUrls) { return } - ctlClient.etcdClient.SetEndpoints(clientURLs...) + GlobalCtlClient.etcdClient.SetEndpoints(clientURLs...) lastClientUrls = make([]string, len(clientURLs)) copy(lastClientUrls, clientURLs) } diff --git a/dm/master/server.go b/dm/master/server.go index f0d1f6630a..f158b79f8f 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -399,6 +399,17 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S resp2 *pb.StartTaskResponse err2 error ) + failpoint.Inject("LongRPCResponse", func() { + var b strings.Builder + size := 5 * 1024 * 1024 + b.Grow(size) + for i := 0; i < size; i++ { + b.WriteByte(0) + } + resp2 = &pb.StartTaskResponse{Msg: b.String()} + failpoint.Return(resp2, nil) + }) + shouldRet := s.sharedLogic(ctx, req, &resp2, &err2) if shouldRet { return resp2, err2 diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 4bac0b6be4..997b1a94d1 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -19,6 +19,7 @@ import ( "database/sql" "fmt" "io/ioutil" + "net" "os" "path/filepath" "sort" @@ -27,10 +28,11 @@ import ( "testing" "time" - sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/DATA-DOG/go-sqlmock" "github.com/golang/mock/gomock" "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" @@ -41,6 +43,7 @@ import ( "github.com/tikv/pd/pkg/tempurl" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/integration" + "google.golang.org/grpc" "github.com/pingcap/dm/checker" "github.com/pingcap/dm/dm/config" @@ -1953,6 +1956,36 @@ func (t *testMaster) subTaskStageMatch(c *check.C, s *scheduler.Scheduler, task, } } +func (t *testMaster) TestGRPCLongResponse(c *check.C) { + c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/master/LongRPCResponse", `return()`), check.IsNil) + //nolint:errcheck + defer failpoint.Disable("github.com/pingcap/dm/dm/master/LongRPCResponse") + c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/ctl/common/SkipUpdateMasterClient", `return()`), check.IsNil) + //nolint:errcheck + defer failpoint.Disable("github.com/pingcap/dm/dm/ctl/common/SkipUpdateMasterClient") + + masterAddr := tempurl.Alloc()[len("http://"):] + lis, err := net.Listen("tcp", masterAddr) + c.Assert(err, check.IsNil) + defer lis.Close() + server := grpc.NewServer() + pb.RegisterMasterServer(server, &Server{}) + //nolint:errcheck + go server.Serve(lis) + + conn, err := grpc.Dial(utils.UnwrapScheme(masterAddr), + grpc.WithInsecure(), + grpc.WithBlock()) + c.Assert(err, check.IsNil) + defer conn.Close() + + common.GlobalCtlClient.MasterClient = pb.NewMasterClient(conn) + ctx := context.Background() + resp := &pb.StartTaskResponse{} + err = common.SendRequest(ctx, "StartTask", &pb.StartTaskRequest{}, &resp) + c.Assert(err, check.IsNil) +} + func mockRevelantWorkerClient(mockWorkerClient *pbmock.MockWorkerClient, taskName, sourceID string, masterReq interface{}) { var expect pb.Stage switch req := masterReq.(type) { diff --git a/go.mod b/go.mod index f612480f78..7a13a9c961 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 github.com/pingcap/parser v0.0.0-20210415081931-48e7f467fd74 github.com/pingcap/tidb v1.1.0-beta.0.20210330094614-60111e1c4b6f - github.com/pingcap/tidb-tools v5.0.0-rc.0.20210318094904-51a9e0c86386+incompatible + github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible github.com/prometheus/client_golang v1.5.1 github.com/rakyll/statik v0.1.6 github.com/soheilhy/cmux v0.1.4 diff --git a/go.sum b/go.sum index 1856d3a406..e47ae92dd1 100644 --- a/go.sum +++ b/go.sum @@ -847,8 +847,8 @@ github.com/pingcap/tidb-tools v4.0.1+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnw github.com/pingcap/tidb-tools v4.0.5-0.20200820082341-afeaaaaaa153+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb-tools v4.0.5-0.20200820092506-34ea90c93237+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= -github.com/pingcap/tidb-tools v5.0.0-rc.0.20210318094904-51a9e0c86386+incompatible h1:tgUbSovpQ12sd5W0eclXlzZh+hWhgLhvdqRy2BMiXyQ= -github.com/pingcap/tidb-tools v5.0.0-rc.0.20210318094904-51a9e0c86386+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= +github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible h1:jvsCYfIx30AnyQQxfzoCdzm8xRX0ZRJT3BxpFSKqrTo= +github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI= github.com/pingcap/tipb v0.0.0-20200604070248-508f03b0b342/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=