Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
adjust grpc max message size on error
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Apr 24, 2021
1 parent 06d151e commit 685baf2
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 15 deletions.
62 changes: 48 additions & 14 deletions dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ import (
"fmt"
"io/ioutil"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/failpoint"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
Expand All @@ -44,7 +49,10 @@ import (

var (
globalConfig = &Config{}
ctlClient = &CtlClient{}
// GlobalCtlClient is the globally used CtlClient in this package. Exposed to be used in test
GlobalCtlClient = &CtlClient{}

re = regexp.MustCompile(`grpc: received message larger than max \((\d+) vs. (\d+)\)`)
)

// CtlClient used to get master client for dmctl.
Expand All @@ -53,7 +61,7 @@ type CtlClient struct {
tls *toolutils.TLS
etcdClient *clientv3.Client
conn *grpc.ClientConn
masterClient pb.MasterClient
MasterClient pb.MasterClient // exposed to be used in test
}

func (c *CtlClient) updateMasterClient() error {
Expand All @@ -75,19 +83,27 @@ func (c *CtlClient) updateMasterClient() error {
conn, err = grpc.Dial(utils.UnwrapScheme(endpoint), c.tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
if err == nil {
c.conn = conn
c.masterClient = pb.NewMasterClient(conn)
c.MasterClient = pb.NewMasterClient(conn)
return nil
}
}
return terror.ErrCtlGRPCCreateConn.AnnotateDelegate(err, "can't connect to %s", strings.Join(endpoints, ","))
}

func (c *CtlClient) sendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error {
func (c *CtlClient) sendRequest(
ctx context.Context,
reqName string,
req interface{},
respPointer interface{},
opts ...interface{}) error {
c.mu.RLock()
defer c.mu.RUnlock()

params := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(req)}
results := reflect.ValueOf(c.masterClient).MethodByName(reqName).Call(params)
for _, o := range opts {
params = append(params, reflect.ValueOf(o))
}
results := reflect.ValueOf(c.MasterClient).MethodByName(reqName).Call(params)

reflect.ValueOf(respPointer).Elem().Set(results[0])
errInterface := results[1].Interface()
Expand All @@ -100,20 +116,38 @@ func (c *CtlClient) sendRequest(ctx context.Context, reqName string, req interfa

// SendRequest send request to master.
func SendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error {
err := ctlClient.sendRequest(ctx, reqName, req, respPointer)
if err == nil || status.Code(err) != codes.Unavailable {
err := GlobalCtlClient.sendRequest(ctx, reqName, req, respPointer)
if err == nil {
return nil
}
var opts []interface{}
switch status.Code(err) {
case codes.ResourceExhausted:
matches := re.FindStringSubmatch(err.Error())
if len(matches) == 3 {
msgSize, err2 := strconv.Atoi(matches[1])
if err2 == nil {
log.L().Info("increase gRPC maximum message size", zap.Int("size", msgSize))
opts = append(opts, grpc.MaxCallRecvMsgSize(msgSize))
}
}
case codes.Unavailable:
default:
return err
}
// TODO: handle status.Code(err) == codes.ResourceExhausted

failpoint.Inject("SkipUpdateMasterClient", func() {
failpoint.Goto("bypass")
})
// update master client
err = ctlClient.updateMasterClient()
err = GlobalCtlClient.updateMasterClient()
if err != nil {
return err
}
failpoint.Label("bypass")

// sendRequest again
return ctlClient.sendRequest(ctx, reqName, req, respPointer)
return GlobalCtlClient.sendRequest(ctx, reqName, req, respPointer, opts...)
}

// InitUtils inits necessary dmctl utils.
Expand Down Expand Up @@ -141,12 +175,12 @@ func InitClient(addr string, securityCfg config.Security) error {
return err
}

ctlClient = &CtlClient{
GlobalCtlClient = &CtlClient{
tls: tls,
etcdClient: etcdClient,
}

return ctlClient.updateMasterClient()
return GlobalCtlClient.updateMasterClient()
}

// GlobalConfig returns global dmctl config.
Expand Down Expand Up @@ -322,7 +356,7 @@ func SyncMasterEndpoints(ctx context.Context) {
clientURLs := []string{}
updateF := func() {
clientURLs = clientURLs[:0]
resp, err := ctlClient.etcdClient.MemberList(ctx)
resp, err := GlobalCtlClient.etcdClient.MemberList(ctx)
if err != nil {
return
}
Expand All @@ -333,7 +367,7 @@ func SyncMasterEndpoints(ctx context.Context) {
if utils.NonRepeatStringsEqual(clientURLs, lastClientUrls) {
return
}
ctlClient.etcdClient.SetEndpoints(clientURLs...)
GlobalCtlClient.etcdClient.SetEndpoints(clientURLs...)
lastClientUrls = make([]string, len(clientURLs))
copy(lastClientUrls, clientURLs)
}
Expand Down
11 changes: 11 additions & 0 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,17 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S
resp2 *pb.StartTaskResponse
err2 error
)
failpoint.Inject("LongRPCResponse", func() {
var b strings.Builder
size := 5 * 1024 * 1024
b.Grow(size)
for i := 0; i < size; i++ {
b.WriteByte(0)
}
resp2 = &pb.StartTaskResponse{Msg: b.String()}
failpoint.Return(resp2, nil)
})

shouldRet := s.sharedLogic(ctx, req, &resp2, &err2)
if shouldRet {
return resp2, err2
Expand Down
33 changes: 32 additions & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ import (
"database/sql"
"fmt"
"io/ioutil"
"net"
"os"
"sort"
"strings"
"sync"
"testing"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/DATA-DOG/go-sqlmock"
"github.com/golang/mock/gomock"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
Expand All @@ -40,6 +42,7 @@ import (
"github.com/tikv/pd/pkg/tempurl"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"
"google.golang.org/grpc"

"github.com/pingcap/dm/checker"
"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -1738,6 +1741,34 @@ func (t *testMaster) subTaskStageMatch(c *check.C, s *scheduler.Scheduler, task,
}
}

func (t *testMaster) TestGRPCLongResponse(c *check.C) {
c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/master/LongRPCResponse", `return()`), check.IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/dm/master/LongRPCResponse")
c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/ctl/common/SkipUpdateMasterClient", `return()`), check.IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/dm/ctl/common/SkipUpdateMasterClient")

masterAddr := tempurl.Alloc()[len("http://"):]
lis, err := net.Listen("tcp", masterAddr)
c.Assert(err, check.IsNil)
defer lis.Close()
server := grpc.NewServer()
pb.RegisterMasterServer(server, &Server{})
//nolint:errcheck
go server.Serve(lis)

conn, err := grpc.Dial(utils.UnwrapScheme(masterAddr),
grpc.WithInsecure(),
grpc.WithBlock())
c.Assert(err, check.IsNil)
common.GlobalCtlClient.MasterClient = pb.NewMasterClient(conn)
ctx := context.Background()
resp := &pb.StartTaskResponse{}
err = common.SendRequest(ctx, "StartTask", &pb.StartTaskRequest{}, &resp)
c.Assert(err, check.IsNil)
}

func mockRevelantWorkerClient(mockWorkerClient *pbmock.MockWorkerClient, taskName, sourceID string, masterReq interface{}) {
var expect pb.Stage
switch req := masterReq.(type) {
Expand Down

0 comments on commit 685baf2

Please sign in to comment.