Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
fix worker client extractor in allWorkerConfigs
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed May 29, 2019
1 parent 6507f43 commit c4a668c
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 36 deletions.
87 changes: 53 additions & 34 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,9 +670,11 @@ func (s *Server) BreakWorkerDDLLock(ctx context.Context, req *pb.BreakWorkerDDLL
return
}
resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout)
workerResp := resp.BreakDDLLock
workerResp := &pb.CommonWorkerResponse{}
if err != nil {
workerResp = errorCommonWorkerResponse(errors.ErrorStack(err), "")
} else {
workerResp = resp.BreakDDLLock
}
workerResp.Worker = worker
workerRespCh <- workerResp
Expand Down Expand Up @@ -745,11 +747,12 @@ func (s *Server) HandleSQLs(ctx context.Context, req *pb.HandleSQLsRequest) (*pb
return resp, nil
}
response, err := cli.SendRequest(ctx, subReq, s.cfg.RPCTimeout)
workerResp := response.HandleSubTaskSQLs
workerResp := &pb.CommonWorkerResponse{}
if err != nil {
workerResp = errorCommonWorkerResponse(errors.ErrorStack(err), "")
} else {
workerResp = response.HandleSubTaskSQLs
}

resp.Workers = []*pb.CommonWorkerResponse{workerResp}
resp.Result = true
return resp, nil
Expand Down Expand Up @@ -781,9 +784,11 @@ func (s *Server) PurgeWorkerRelay(ctx context.Context, req *pb.PurgeWorkerRelayR
return
}
resp, err := cli.SendRequest(ctx, workerReq, s.cfg.RPCTimeout)
workerResp := resp.PurgeRelay
workerResp := &pb.CommonWorkerResponse{}
if err != nil {
workerResp = errorCommonWorkerResponse(errors.ErrorStack(err), "")
} else {
workerResp = resp.PurgeRelay
}
workerResp.Worker = worker
workerRespCh <- workerResp
Expand Down Expand Up @@ -840,9 +845,11 @@ func (s *Server) SwitchWorkerRelayMaster(ctx context.Context, req *pb.SwitchWork
SwitchRelayMaster: &pb.SwitchRelayMasterRequest{},
}
resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout)
workerResp := resp.SwitchRelayMaster
workerResp := &pb.CommonWorkerResponse{}
if err != nil {
workerResp = errorCommonWorkerResponse(errors.ErrorStack(err), "")
} else {
workerResp = resp.SwitchRelayMaster
}
workerResp.Worker = worker1
workerRespCh <- workerResp
Expand Down Expand Up @@ -902,12 +909,14 @@ func (s *Server) OperateWorkerRelayTask(ctx context.Context, req *pb.OperateWork
return
}
resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout)
workerResp := resp.OperateRelay
workerResp := &pb.OperateRelayResponse{}
if err != nil {
workerResp = &pb.OperateRelayResponse{
Result: false,
Msg: errors.ErrorStack(err),
}
} else {
workerResp = resp.OperateRelay
}
workerResp.Op = req.Op
workerResp.Worker = worker
Expand Down Expand Up @@ -1092,12 +1101,14 @@ func (s *Server) getStatusFromWorkers(ctx context.Context, workers []string, tas
return
}
resp, err := cli.SendRequest(ctx, workerReq, s.cfg.RPCTimeout)
workerStatus := resp.QueryStatus
workerStatus := &pb.QueryStatusResponse{}
if err != nil {
workerStatus = &pb.QueryStatusResponse{
Result: false,
Msg: errors.ErrorStack(err),
}
} else {
workerStatus = resp.QueryStatus
}
workerStatus.Worker = worker1
workerRespCh <- workerStatus
Expand Down Expand Up @@ -1145,12 +1156,14 @@ func (s *Server) getErrorFromWorkers(ctx context.Context, workers []string, task
return
}
resp, err := cli.SendRequest(ctx, workerReq, s.cfg.RPCTimeout)
workerError := resp.QueryError
workerError := &pb.QueryErrorResponse{}
if err != nil {
workerError = &pb.QueryErrorResponse{
Result: false,
Msg: errors.ErrorStack(err),
}
} else {
workerError = resp.QueryError
}
workerError.Worker = worker1
workerRespCh <- workerError
Expand Down Expand Up @@ -1259,12 +1272,12 @@ func (s *Server) fetchWorkerDDLInfo(ctx context.Context) {
return
default:
resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout)
stream := resp.FetchDDLInfo
if err != nil {
log.Errorf("[server] create FetchDDLInfo stream for worker %s fail %v", worker, err)
doRetry = true
continue
}
stream := resp.FetchDDLInfo
for {
in, err := stream.Recv()
if err == io.EOF {
Expand Down Expand Up @@ -1394,10 +1407,11 @@ func (s *Server) resolveDDLLock(ctx context.Context, lockID string, replaceOwner
},
}
resp, err := cli.SendRequest(ctx, ownerReq, s.cfg.RPCTimeout)
ownerResp := resp.HandleSubTaskSQLs
if err != nil {
return nil, errors.Annotatef(err, "send handle SQLs request %s to DDL lock %s owner %s fail", ownerReq.HandleSubTaskSQLs, lockID, owner)
} else if !ownerResp.Result {
}
ownerResp := resp.HandleSubTaskSQLs
if !ownerResp.Result {
return nil, errors.Errorf("request DDL lock %s owner %s handle SQLs request %s fail %s", lockID, owner, ownerReq.HandleSubTaskSQLs, ownerResp.Msg)
}
log.Infof("[server] sent handle --sharding DDL request %s to owner %s for lock %s", ownerReq.HandleSubTaskSQLs, owner, lockID)
Expand All @@ -1420,9 +1434,11 @@ func (s *Server) resolveDDLLock(ctx context.Context, lockID string, replaceOwner
},
}
resp, err := cli.SendRequest(ctx, ownerReq, s.cfg.RPCTimeout)
ownerResp := resp.ExecDDL
ownerResp := &pb.CommonWorkerResponse{}
if err != nil {
ownerResp = errorCommonWorkerResponse(errors.ErrorStack(err), "")
} else {
ownerResp = resp.ExecDDL
}
ownerResp.Worker = owner
if !ownerResp.Result {
Expand Down Expand Up @@ -1473,9 +1489,11 @@ func (s *Server) resolveDDLLock(ctx context.Context, lockID string, replaceOwner

log.Infof("[server] requesting %s to skip DDL (with ID %s)", worker, lockID)
resp, err2 := cli.SendRequest(ctx, request, s.cfg.RPCTimeout)
workerResp := resp.ExecDDL
var workerResp *pb.CommonWorkerResponse
if err2 != nil {
workerResp = errorCommonWorkerResponse(errors.ErrorStack(err2), "")
} else {
workerResp = resp.ExecDDL
}
workerResp.Worker = worker
workerRespCh <- workerResp
Expand Down Expand Up @@ -1657,11 +1675,10 @@ func (s *Server) UpdateWorkerRelayConfig(ctx context.Context, req *pb.UpdateWork
UpdateRelay: &pb.UpdateRelayRequest{Content: content},
}
resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout)
workerResp := resp.UpdateRelay
if err != nil {
return errorCommonWorkerResponse(errors.ErrorStack(err), worker), nil
}
return workerResp, nil
return resp.UpdateRelay, nil
}

func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConfig, error) {
Expand All @@ -1679,61 +1696,64 @@ func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConf
return false
}

argsExtractor := func(args ...interface{}) (string, pb.WorkerClient, bool) {
argsExtractor := func(args ...interface{}) (string, workerrpc.Client, bool) {
if len(args) != 2 {
return "", nil, handleErr(errors.Errorf("fail to call emit to fetch worker config, miss some arguments %v", args))
}

id, ok := args[0].(string)
worker, ok := args[0].(string)
if !ok {
return "", nil, handleErr(errors.Errorf("fail to call emit to fetch worker config, can't get id from args[0], arguments %v", args))
}

worker, ok := args[1].(pb.WorkerClient)
client, ok := args[1].(*workerrpc.GRPCClient)
if !ok {
return "", nil, handleErr(errors.Errorf("fail to call emit to fetch config of worker %s, can't get worker client from args[1], arguments %v", id, args))
return "", nil, handleErr(errors.Errorf("fail to call emit to fetch config of worker %s, can't get worker client from args[1], arguments %v", worker, args))
}

return id, worker, true
return worker, client, true
}

for id, worker := range s.workerClients {
request := &workerrpc.Request{
Type: workerrpc.CmdQueryWorkerConfig,
QueryWorkerConfig: &pb.QueryWorkerConfigRequest{},
}
for worker, client := range s.workerClients {
wg.Add(1)
go Emit(ctx, 0, func(args ...interface{}) {
defer wg.Done()

id1, worker1, ok := argsExtractor(args...)
worker1, client1, ok := argsExtractor(args...)
if !ok {
return
}

ctx2, cancel := context.WithTimeout(ctx, s.cfg.RPCTimeout)
defer cancel()
resp, err1 := worker1.QueryWorkerConfig(ctx2, &pb.QueryWorkerConfigRequest{})
response, err1 := client1.SendRequest(ctx, request, s.cfg.RPCTimeout)
resp := response.QueryWorkerConfig
if err1 != nil {
handleErr(errors.Annotatef(err1, "fetch config of worker %s", id1))
handleErr(errors.Annotatef(err1, "fetch config of worker %s", worker1))
return
}

if !resp.Result {
handleErr(errors.Errorf("fail to query config from worker %s, message %s", id1, resp.Msg))
handleErr(errors.Errorf("fail to query config from worker %s, message %s", worker1, resp.Msg))
return
}

if len(resp.Content) == 0 {
handleErr(errors.Errorf("fail to query config from worker %s, config is empty", id1))
handleErr(errors.Errorf("fail to query config from worker %s, config is empty", worker1))
return
}

if len(resp.SourceID) == 0 {
handleErr(errors.Errorf("fail to query config from worker %s, source ID is empty, it should be set in worker config", id1))
handleErr(errors.Errorf("fail to query config from worker %s, source ID is empty, it should be set in worker config", worker1))
return
}

dbCfg := &config.DBConfig{}
err2 := dbCfg.Decode(resp.Content)
if err2 != nil {
handleErr(errors.Annotatef(err2, "unmarshal worker %s config", id1))
handleErr(errors.Annotatef(err2, "unmarshal worker %s config", worker1))
return
}

Expand All @@ -1744,12 +1764,12 @@ func (s *Server) allWorkerConfigs(ctx context.Context) (map[string]config.DBConf
}, func(args ...interface{}) {
defer wg.Done()

_, worker1, ok := argsExtractor(args...)
worker1, _, ok := argsExtractor(args...)
if !ok {
return
}
handleErr(errors.Errorf("fail to get emit opporunity for worker %s", worker1))
}, []interface{}{id, worker}...)
}, worker, client)
}

wg.Wait()
Expand All @@ -1776,11 +1796,10 @@ func (s *Server) MigrateWorkerRelay(ctx context.Context, req *pb.MigrateWorkerRe
MigrateRelay: &pb.MigrateRelayRequest{BinlogName: binlogName, BinlogPos: binlogPos},
}
resp, err := cli.SendRequest(ctx, request, s.cfg.RPCTimeout)
workerResp := resp.MigrateRelay
if err != nil {
return errorCommonWorkerResponse(errors.ErrorStack(err), worker), nil
}
return workerResp, nil
return resp.MigrateRelay, nil
}

// CheckTask checks legality of task configuration
Expand Down
5 changes: 3 additions & 2 deletions dm/master/workerrpc/rawgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ func (c *GRPCClient) SendRequest(ctx context.Context, req *Request, timeout time
defer cancel()
return callRPC(ctx1, c.client, req)
}
ctx1, cancel := context.WithCancel(ctx)
defer cancel()
ctx1, _ := context.WithCancel(ctx)
return callRPC(ctx1, c.client, req)
}

Expand Down Expand Up @@ -84,6 +83,8 @@ func callRPC(ctx context.Context, client pb.WorkerClient, req *Request) (*Respon
resp.QueryError, err = client.QueryError(ctx, req.QueryError)
case CmdQueryTaskOperation:
resp.QueryTaskOperation, err = client.QueryTaskOperation(ctx, req.QueryTaskOperation)
case CmdQueryWorkerConfig:
resp.QueryWorkerConfig, err = client.QueryWorkerConfig(ctx, req.QueryWorkerConfig)
case CmdHandleSubTaskSQLs:
resp.HandleSubTaskSQLs, err = client.HandleSQLs(ctx, req.HandleSubTaskSQLs)
case CmdExecDDL:
Expand Down

0 comments on commit c4a668c

Please sign in to comment.