Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: limit checker error at a finer granularity and auto increase max size #1624

Merged
merged 12 commits into from
May 13, 2021
55 changes: 43 additions & 12 deletions checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
continue
}

c.checkList = append(c.checkList, check.NewShardingTablesCheck(name, dbs, shardingSet, columnMapping, checkingShardID))
c.checkList = append(c.checkList, check.NewShardingTablesChecker(name, dbs, shardingSet, columnMapping, checkingShardID))
}
}

Expand Down Expand Up @@ -262,24 +262,55 @@ func (c *Checker) Process(ctx context.Context, pr chan pb.ProcessResult) {
errs = append(errs, unit.NewProcessError(err))
} else if !result.Summary.Passed {
errs = append(errs, unit.NewProcessError(errors.New("check was failed, please see detail")))
warnLeft, errLeft := c.warnCnt, c.errCnt

// remove success result if not pass
results := result.Results[:0]
var warnCnt int64 = 0
var errCnt int64 = 0
for _, r := range result.Results {
switch r.State {
case check.StateFailure:
if errCnt < c.errCnt {
errCnt++
if r.State == check.StateSuccess {
continue
}

// handle results without r.Errors
if len(r.Errors) == 0 {
switch r.State {
case check.StateWarning:
if warnLeft == 0 {
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
continue
}
warnLeft--
results = append(results, r)
}
case check.StateWarning:
if warnCnt < c.warnCnt {
warnCnt++
case check.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
results = append(results, r)
}
default:
continue
}

subErrors := make([]*check.Error, 0, len(r.Errors))
for _, e := range r.Errors {
switch e.Severity {
case check.StateWarning:
if warnLeft == 0 {
continue
}
warnLeft--
subErrors = append(subErrors, e)
case check.StateFailure:
if errLeft == 0 {
continue
}
errLeft--
subErrors = append(subErrors, e)
}
}
// skip display an empty Result
if len(subErrors) > 0 {
r.Errors = subErrors
results = append(results, r)
}
}
result.Results = results
Expand Down
62 changes: 49 additions & 13 deletions dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@ import (
"fmt"
"io/ioutil"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/pingcap/failpoint"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt

Copy link
Collaborator Author

@lance6716 lance6716 Apr 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is caused by our make fmt @Ehco1996 PTAL

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our fmt tools just can't group this imports correctly... but you can formart manually - -.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix these imports manually?

Expand All @@ -44,7 +50,10 @@ import (

var (
globalConfig = &Config{}
ctlClient = &CtlClient{}
// GlobalCtlClient is the globally used CtlClient in this package. Exposed to be used in test
GlobalCtlClient = &CtlClient{}

re = regexp.MustCompile(`grpc: received message larger than max \((\d+) vs. (\d+)\)`)
)

// CtlClient used to get master client for dmctl.
Expand All @@ -53,7 +62,7 @@ type CtlClient struct {
tls *toolutils.TLS
etcdClient *clientv3.Client
conn *grpc.ClientConn
masterClient pb.MasterClient
MasterClient pb.MasterClient // exposed to be used in test
}

func (c *CtlClient) updateMasterClient() error {
Expand All @@ -75,19 +84,27 @@ func (c *CtlClient) updateMasterClient() error {
conn, err = grpc.Dial(utils.UnwrapScheme(endpoint), c.tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
if err == nil {
c.conn = conn
c.masterClient = pb.NewMasterClient(conn)
c.MasterClient = pb.NewMasterClient(conn)
return nil
}
}
return terror.ErrCtlGRPCCreateConn.AnnotateDelegate(err, "can't connect to %s", strings.Join(endpoints, ","))
}

func (c *CtlClient) sendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error {
func (c *CtlClient) sendRequest(
ctx context.Context,
reqName string,
req interface{},
respPointer interface{},
opts ...interface{}) error {
c.mu.RLock()
defer c.mu.RUnlock()

params := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(req)}
results := reflect.ValueOf(c.masterClient).MethodByName(reqName).Call(params)
for _, o := range opts {
params = append(params, reflect.ValueOf(o))
}
results := reflect.ValueOf(c.MasterClient).MethodByName(reqName).Call(params)

reflect.ValueOf(respPointer).Elem().Set(results[0])
errInterface := results[1].Interface()
Expand All @@ -100,19 +117,38 @@ func (c *CtlClient) sendRequest(ctx context.Context, reqName string, req interfa

// SendRequest send request to master.
func SendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error {
err := ctlClient.sendRequest(ctx, reqName, req, respPointer)
if err == nil || status.Code(err) != codes.Unavailable {
err := GlobalCtlClient.sendRequest(ctx, reqName, req, respPointer)
if err == nil {
return nil
}
var opts []interface{}
switch status.Code(err) {
case codes.ResourceExhausted:
matches := re.FindStringSubmatch(err.Error())
if len(matches) == 3 {
msgSize, err2 := strconv.Atoi(matches[1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we increase msgSize a little more?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unit test shows this is enough

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the result become a little bigger for the second request?

Copy link
Collaborator Author

@lance6716 lance6716 Apr 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't decide a reasonable delta for "a little bigger" 🤔

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'm just afraid it's too strict sometimes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a hard limit of gRPC message size? And how about use next_pow_of_2(msg_size) as the new limit

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a hard limit of gRPC message size?

MaxInt. This only has memory affects. next_pow_of_2 seems more like a strategy for container capacity growing, while here I think using exactly the size is OK, because we sent the RPC again in a short time so the response is more likely not changed.

if err2 == nil {
log.L().Info("increase gRPC maximum message size", zap.Int("size", msgSize))
opts = append(opts, grpc.MaxCallRecvMsgSize(msgSize))
}
}
case codes.Unavailable:
default:
return err
}

failpoint.Inject("SkipUpdateMasterClient", func() {
failpoint.Goto("bypass")
})
// update master client
err = ctlClient.updateMasterClient()
err = GlobalCtlClient.updateMasterClient()
if err != nil {
return err
}
failpoint.Label("bypass")

// sendRequest again
return ctlClient.sendRequest(ctx, reqName, req, respPointer)
return GlobalCtlClient.sendRequest(ctx, reqName, req, respPointer, opts...)
}

// InitUtils inits necessary dmctl utils.
Expand Down Expand Up @@ -140,12 +176,12 @@ func InitClient(addr string, securityCfg config.Security) error {
return err
}

ctlClient = &CtlClient{
GlobalCtlClient = &CtlClient{
tls: tls,
etcdClient: etcdClient,
}

return ctlClient.updateMasterClient()
return GlobalCtlClient.updateMasterClient()
}

// GlobalConfig returns global dmctl config.
Expand Down Expand Up @@ -321,7 +357,7 @@ func SyncMasterEndpoints(ctx context.Context) {
clientURLs := []string{}
updateF := func() {
clientURLs = clientURLs[:0]
resp, err := ctlClient.etcdClient.MemberList(ctx)
resp, err := GlobalCtlClient.etcdClient.MemberList(ctx)
if err != nil {
return
}
Expand All @@ -332,7 +368,7 @@ func SyncMasterEndpoints(ctx context.Context) {
if utils.NonRepeatStringsEqual(clientURLs, lastClientUrls) {
return
}
ctlClient.etcdClient.SetEndpoints(clientURLs...)
GlobalCtlClient.etcdClient.SetEndpoints(clientURLs...)
lastClientUrls = make([]string, len(clientURLs))
copy(lastClientUrls, clientURLs)
}
Expand Down
11 changes: 11 additions & 0 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,17 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
resp2 *pb.StartTaskResponse
err2 error
)
failpoint.Inject("LongRPCResponse", func() {
var b strings.Builder
size := 5 * 1024 * 1024
b.Grow(size)
for i := 0; i < size; i++ {
b.WriteByte(0)
}
resp2 = &pb.StartTaskResponse{Msg: b.String()}
failpoint.Return(resp2, nil)
})

shouldRet := s.sharedLogic(ctx, req, &resp2, &err2)
if shouldRet {
return resp2, err2
Expand Down
33 changes: 32 additions & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ import (
"database/sql"
"fmt"
"io/ioutil"
"net"
"os"
"sort"
"strings"
"sync"
"testing"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/DATA-DOG/go-sqlmock"
"github.com/golang/mock/gomock"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand All @@ -40,6 +42,7 @@ import (
"github.com/tikv/pd/pkg/tempurl"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"
"google.golang.org/grpc"

"github.com/pingcap/dm/checker"
"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -1738,6 +1741,34 @@ func (t *testMaster) subTaskStageMatch(c *check.C, s *scheduler.Scheduler, task,
}
}

func (t *testMaster) TestGRPCLongResponse(c *check.C) {
c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/master/LongRPCResponse", `return()`), check.IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/dm/master/LongRPCResponse")
c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/ctl/common/SkipUpdateMasterClient", `return()`), check.IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/dm/ctl/common/SkipUpdateMasterClient")

masterAddr := tempurl.Alloc()[len("http://"):]
lis, err := net.Listen("tcp", masterAddr)
c.Assert(err, check.IsNil)
defer lis.Close()
server := grpc.NewServer()
pb.RegisterMasterServer(server, &Server{})
//nolint:errcheck
go server.Serve(lis)

conn, err := grpc.Dial(utils.UnwrapScheme(masterAddr),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add defer conn.Close() here? 🤔

grpc.WithInsecure(),
grpc.WithBlock())
c.Assert(err, check.IsNil)
common.GlobalCtlClient.MasterClient = pb.NewMasterClient(conn)
ctx := context.Background()
resp := &pb.StartTaskResponse{}
err = common.SendRequest(ctx, "StartTask", &pb.StartTaskRequest{}, &resp)
c.Assert(err, check.IsNil)
}

func mockRevelantWorkerClient(mockWorkerClient *pbmock.MockWorkerClient, taskName, sourceID string, masterReq interface{}) {
var expect pb.Stage
switch req := masterReq.(type) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4
github.com/pingcap/parser v0.0.0-20210415081931-48e7f467fd74
github.com/pingcap/tidb v1.1.0-beta.0.20210330094614-60111e1c4b6f
github.com/pingcap/tidb-tools v5.0.0-rc.0.20210318094904-51a9e0c86386+incompatible
github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/rakyll/statik v0.1.6
github.com/soheilhy/cmux v0.1.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,8 @@ github.com/pingcap/tidb-tools v4.0.1+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnw
github.com/pingcap/tidb-tools v4.0.5-0.20200820082341-afeaaaaaa153+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.5-0.20200820092506-34ea90c93237+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v4.0.9-0.20201127090955-2707c97b3853+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v5.0.0-rc.0.20210318094904-51a9e0c86386+incompatible h1:tgUbSovpQ12sd5W0eclXlzZh+hWhgLhvdqRy2BMiXyQ=
github.com/pingcap/tidb-tools v5.0.0-rc.0.20210318094904-51a9e0c86386+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible h1:jvsCYfIx30AnyQQxfzoCdzm8xRX0ZRJT3BxpFSKqrTo=
github.com/pingcap/tidb-tools v5.0.1-0.20210420102153-beed8ddc59e9+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200417094153-7316d94df1ee/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
github.com/pingcap/tipb v0.0.0-20200604070248-508f03b0b342/go.mod h1:RtkHW8WbcNxj8lsbzjaILci01CtYnYbIkQhjyZWrWVI=
Expand Down