Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dmctl: support and sync mutiple endpoints (#1333) #1349

Merged
merged 5 commits into from
Dec 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/dm-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -195,6 +196,10 @@ func interactionMode() {
}
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go common.SyncMasterEndpoints(ctx)

loop()

fmt.Println("dmctl exit")
Expand Down
17 changes: 14 additions & 3 deletions dm/ctl/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"flag"
"fmt"
"net"
"strings"
"time"

"github.com/pingcap/dm/dm/config"
Expand All @@ -39,6 +40,11 @@ const (
Master = "master"
// Worker specifies member worker type
Worker = "worker"

dialTimeout = 3 * time.Second
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
syncMasterEndpointsTime = 3 * time.Second
)

// NewConfig creates a new base config for dmctl.
Expand Down Expand Up @@ -139,7 +145,7 @@ func (c *Config) Parse(arguments []string) (finish bool, err error) {
}

if c.MasterAddr == "" {
return false, flag.ErrHelp
return false, errors.Errorf("--master-addr not provided, use --help to see help messages")
}

return false, errors.Trace(c.adjust())
Expand Down Expand Up @@ -180,6 +186,11 @@ func (c *Config) adjust() error {

// validate host:port format address
func validateAddr(addr string) error {
_, _, err := net.SplitHostPort(addr)
return errors.Trace(err)
endpoints := strings.Split(addr, ",")
for _, endpoint := range endpoints {
if _, _, err := net.SplitHostPort(utils.UnwrapScheme(endpoint)); err != nil {
return errors.Trace(err)
}
}
return nil
}
16 changes: 11 additions & 5 deletions dm/ctl/common/operate_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,15 @@ import (
func OperateRelay(op pb.RelayOp, workers []string) (*pb.OperateWorkerRelayResponse, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli := MasterClient()
return cli.OperateWorkerRelayTask(ctx, &pb.OperateWorkerRelayRequest{
Op: op,
Sources: workers,
})
resp := &pb.OperateWorkerRelayResponse{}
err := SendRequest(
ctx,
"OperateWorkerRelayTask",
&pb.OperateWorkerRelayRequest{
Op: op,
Sources: workers,
},
&resp,
)
return resp, err
}
19 changes: 13 additions & 6 deletions dm/ctl/common/operate_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@ import (
func OperateTask(op pb.TaskOp, name string, sources []string) (*pb.OperateTaskResponse, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli := MasterClient()
return cli.OperateTask(ctx, &pb.OperateTaskRequest{
Op: op,
Name: name,
Sources: sources,
})

resp := &pb.OperateTaskResponse{}
err := SendRequest(
ctx,
"OperateTask",
&pb.OperateTaskRequest{
Op: op,
Name: name,
Sources: sources,
},
&resp,
)
return resp, err
}
135 changes: 124 additions & 11 deletions dm/ctl/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@
package common

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"reflect"
"strings"
"sync"
"time"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
"go.etcd.io/etcd/clientv3"

"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
Expand All @@ -33,13 +37,83 @@ import (
toolutils "github.com/pingcap/tidb-tools/pkg/utils"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var (
masterClient pb.MasterClient
globalConfig = &Config{}
ctlClient = &CtlClient{}
)

// CtlClient used to get master client for dmctl
type CtlClient struct {
mu sync.RWMutex
tls *toolutils.TLS
etcdClient *clientv3.Client
conn *grpc.ClientConn
masterClient pb.MasterClient
}

func (c *CtlClient) updateMasterClient() error {
var (
err error
conn *grpc.ClientConn
)

c.mu.Lock()
defer c.mu.Unlock()

if c.conn != nil {
c.conn.Close()
}

endpoints := c.etcdClient.Endpoints()
for _, endpoint := range endpoints {
//nolint:staticcheck
conn, err = grpc.Dial(utils.UnwrapScheme(endpoint), c.tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
if err == nil {
c.conn = conn
c.masterClient = pb.NewMasterClient(conn)
return nil
}
}
return terror.ErrCtlGRPCCreateConn.AnnotateDelegate(err, "can't connect to %s", strings.Join(endpoints, ","))
}

func (c *CtlClient) sendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error {
c.mu.RLock()
defer c.mu.RUnlock()

params := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(req)}
results := reflect.ValueOf(c.masterClient).MethodByName(reqName).Call(params)

reflect.ValueOf(respPointer).Elem().Set(results[0])
errInterface := results[1].Interface()
// nil can't pass type conversion, so we handle it separately
if errInterface == nil {
return nil
}
return errInterface.(error)
}

// SendRequest send request to master
func SendRequest(ctx context.Context, reqName string, req interface{}, respPointer interface{}) error {
err := ctlClient.sendRequest(ctx, reqName, req, respPointer)
if err == nil || status.Code(err) != codes.Unavailable {
return err
}

// update master client
err = ctlClient.updateMasterClient()
if err != nil {
return err
}

// sendRequest again
return ctlClient.sendRequest(ctx, reqName, req, respPointer)
}

// InitUtils inits necessary dmctl utils
func InitUtils(cfg *Config) error {
globalConfig = cfg
Expand All @@ -53,25 +127,31 @@ func InitClient(addr string, securityCfg config.Security) error {
return terror.ErrCtlInvalidTLSCfg.Delegate(err)
}

//nolint:staticcheck
conn, err := grpc.Dial(addr, tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second))
endpoints := strings.Split(addr, ",")
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
DialKeepAliveTime: keepaliveTime,
DialKeepAliveTimeout: keepaliveTimeout,
TLS: tls.TLSConfig(),
})
if err != nil {
return terror.ErrCtlGRPCCreateConn.AnnotateDelegate(err, "can't connect to %s", addr)
return err
}
masterClient = pb.NewMasterClient(conn)
return nil

ctlClient = &CtlClient{
tls: tls,
etcdClient: etcdClient,
}

return ctlClient.updateMasterClient()
}

// GlobalConfig returns global dmctl config
func GlobalConfig() *Config {
return globalConfig
}

// MasterClient returns dm-master client
func MasterClient() pb.MasterClient {
return masterClient
}

// PrintLines adds a wrap to support `\n` within `chzyer/readline`
func PrintLines(format string, a ...interface{}) {
fmt.Println(fmt.Sprintf(format, a...))
Expand Down Expand Up @@ -234,3 +314,36 @@ func PrintCmdUsage(cmd *cobra.Command) {
fmt.Println("can't output command's usage:", err)
}
}

// SyncMasterEndpoints sync masters' endpoints
func SyncMasterEndpoints(ctx context.Context) {
lastClientUrls := []string{}
clientURLs := []string{}
updateF := func() {
clientURLs = clientURLs[:0]
resp, err := ctlClient.etcdClient.MemberList(ctx)
if err != nil {
return
}

for _, m := range resp.Members {
clientURLs = append(clientURLs, m.GetClientURLs()...)
}
if utils.NonRepeatStringsEqual(clientURLs, lastClientUrls) {
return
}
ctlClient.etcdClient.SetEndpoints(clientURLs...)
lastClientUrls = make([]string, len(clientURLs))
copy(lastClientUrls, clientURLs)
}

for {
updateF()

select {
case <-ctx.Done():
return
case <-time.After(syncMasterEndpointsTime):
}
}
}
14 changes: 10 additions & 4 deletions dm/ctl/master/check_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,16 @@ func checkTaskFunc(cmd *cobra.Command, _ []string) (err error) {
defer cancel()

// start task
cli := common.MasterClient()
resp, err := cli.CheckTask(ctx, &pb.CheckTaskRequest{
Task: string(content),
})
resp := &pb.CheckTaskResponse{}
err = common.SendRequest(
ctx,
"CheckTask",
&pb.CheckTaskRequest{
Task: string(content),
},
&resp,
)

if err != nil {
return
}
Expand Down
15 changes: 10 additions & 5 deletions dm/ctl/master/get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,19 @@ func getCfgFunc(cmd *cobra.Command, _ []string) (err error) {
return
}

cli := common.MasterClient()
ctx, cancel := context.WithTimeout(context.Background(), common.GlobalConfig().RPCTimeout)
defer cancel()

resp, err := cli.GetCfg(ctx, &pb.GetCfgRequest{
Type: tp,
Name: cfgName,
})
resp := &pb.GetCfgResponse{}
err = common.SendRequest(
ctx,
"GetCfg",
&pb.GetCfgRequest{
Type: tp,
Name: cfgName,
},
&resp,
)
if err != nil {
common.PrintLines("can not get %s config of %s", cfgType, cfgName)
return
Expand Down
24 changes: 15 additions & 9 deletions dm/ctl/master/handle_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,21 @@ func handleErrorFunc(cmd *cobra.Command, _ []string) (err error) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cli := common.MasterClient()

resp, err := cli.HandleError(ctx, &pb.HandleErrorRequest{
Op: op,
Task: taskName,
BinlogPos: binlogPos,
Sqls: sqls,
Sources: sources,
})

resp := &pb.HandleErrorResponse{}
err = common.SendRequest(
ctx,
"HandleError",
&pb.HandleErrorRequest{
Op: op,
Task: taskName,
BinlogPos: binlogPos,
Sqls: sqls,
Sources: sources,
},
&resp,
)

if err != nil {
return
}
Expand Down
18 changes: 11 additions & 7 deletions dm/ctl/master/list_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,17 @@ func listMemberFunc(cmd *cobra.Command, _ []string) (err error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cli := common.MasterClient()
resp, err := cli.ListMember(ctx, &pb.ListMemberRequest{
Leader: leader,
Master: master,
Worker: worker,
Names: listMemberFlags.names,
})
resp := &pb.ListMemberResponse{}
err = common.SendRequest(ctx,
"ListMember",
&pb.ListMemberRequest{
Leader: leader,
Master: master,
Worker: worker,
Names: listMemberFlags.names,
},
&resp,
)

if err != nil {
return
Expand Down
Loading