Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

feat: add get-config command #1323

Merged
merged 18 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (

"github.com/chaos-mesh/go-sqlsmith"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

config2 "github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/conn"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

const (
Expand Down Expand Up @@ -356,6 +358,10 @@ func (t *task) genIncrData(ctx context.Context) (err error) {
i2 := i
eg.Go(func() error {
if err2 := conn2.execSQLs(ctx, query); err2 != nil {
if utils.IsMySQLError(err2, mysql.ErrDupFieldName) {
t.logger.Warn("ignore duplicate field name for ddl", log.ShortError(err))
return nil
}
return err2
}
t.results[i2].DDL++
Expand Down
20 changes: 20 additions & 0 deletions cmd/dm-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func main() {
os.Exit(0)
}

args = aliasArgs(args)

// now, we use checker in dmctl while it using some pkg which log some thing when running
// to make dmctl output more clear, simply redirect log to file rather output to stdout
err := log.InitLogger(&log.Config{
Expand Down Expand Up @@ -229,6 +231,7 @@ func loop() {
}

args := strings.Fields(line)
args = aliasArgs(args)
err = ctl.Start(args)
if err != nil {
fmt.Println("fail to run:", args)
Expand All @@ -240,3 +243,20 @@ func loop() {
}
}
}

func aliasArgs(args []string) []string {
args = aliasGetTaskCfgCmd(args)
return args
}

func aliasGetTaskCfgCmd(args []string) []string {
for i, arg := range args {
if arg == "get-task-config" {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
args = append(args[:i+1], args[i:]...)
args[i] = "get-config"
args[i+1] = "task"
return args
}
}
return args
}
61 changes: 36 additions & 25 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,6 @@ func (m *MySQLInstance) VerifyAndAdjust() error {
return nil
}

// RemoveDuplicateCfg remove duplicate mydumper, loader, and syncer config
func (m *MySQLInstance) RemoveDuplicateCfg() {
if len(m.MydumperConfigName) > 0 && m.Mydumper != nil {
m.Mydumper = nil
}
if len(m.LoaderConfigName) > 0 && m.Loader != nil {
m.Loader = nil
}
if len(m.SyncerConfigName) > 0 && m.Syncer != nil {
m.Syncer = nil
}
}

// MydumperConfig represents mydumper process unit's specific config
type MydumperConfig struct {
MydumperPath string `yaml:"mydumper-path" toml:"mydumper-path" json:"mydumper-path"` // mydumper binary path
Expand Down Expand Up @@ -628,6 +615,21 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
return cfgs, nil
}

func getGenerateName(rule interface{}, nameIdx int, namePrefix string, nameMap map[string]string) (string, int) {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
// use json as key since no DeepEqual for rules now.
ruleByte, err := json.Marshal(rule)
if err != nil {
log.L().Error(fmt.Sprintf("marshal %s rule to json", namePrefix), log.ShortError(err))
return fmt.Sprintf("%s-%02d", namePrefix, nameIdx), nameIdx + 1
} else if val, ok := nameMap[string(ruleByte)]; ok {
return val, nameIdx
} else {
ruleName := fmt.Sprintf("%s-%02d", namePrefix, nameIdx+1)
nameMap[string(ruleByte)] = ruleName
return ruleName, nameIdx + 1
}
}

// FromSubTaskConfigs constructs task configs from a list of valid subtask configs.
// this method is only used to construct config when importing from v1.0.x now.
func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) {
Expand Down Expand Up @@ -656,39 +658,48 @@ func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) {
c.Loaders = make(map[string]*LoaderConfig)
c.Syncers = make(map[string]*SyncerConfig)

BAListMap := make(map[string]string, len(stCfgs))
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
routeMap := make(map[string]string, len(stCfgs))
filterMap := make(map[string]string, len(stCfgs))
dumpMap := make(map[string]string, len(stCfgs))
loadMap := make(map[string]string, len(stCfgs))
syncMap := make(map[string]string, len(stCfgs))
cmMap := make(map[string]string, len(stCfgs))
var BAListIdx, routeIdx, filterIdx, dumpIdx, loadIdx, syncIdx, cmIdx int
var BAListName, routeName, filterName, dumpName, loadName, syncName, cmName string

// NOTE:
// - we choose to ref global configs for instances now.
// - no DeepEqual for rules now, so not combine REAL same rule into only one.
for i, stCfg := range stCfgs {
BAListName := fmt.Sprintf("balist-%02d", i+1)
for _, stCfg := range stCfgs {
BAListName, BAListIdx = getGenerateName(stCfg.BAList, BAListIdx, "balist", BAListMap)
c.BAList[BAListName] = stCfg.BAList

routeNames := make([]string, 0, len(stCfg.RouteRules))
for j, rule := range stCfg.RouteRules {
routeName := fmt.Sprintf("route-%02d-%02d", i+1, j+1)
for _, rule := range stCfg.RouteRules {
routeName, routeIdx = getGenerateName(rule, routeIdx, "route", routeMap)
routeNames = append(routeNames, routeName)
c.Routes[routeName] = rule
}

filterNames := make([]string, 0, len(stCfg.FilterRules))
for j, rule := range stCfg.FilterRules {
filterName := fmt.Sprintf("filter-%02d-%02d", i+1, j+1)
for _, rule := range stCfg.FilterRules {
filterName, filterIdx = getGenerateName(rule, filterIdx, "filter", filterMap)
filterNames = append(filterNames, filterName)
c.Filters[filterName] = rule
}

dumpName := fmt.Sprintf("dump-%02d", i+1)
dumpName, dumpIdx = getGenerateName(stCfg.MydumperConfig, dumpIdx, "dump", dumpMap)
c.Mydumpers[dumpName] = &stCfg.MydumperConfig

loadName := fmt.Sprintf("load-%02d", i+1)
loadName, loadIdx = getGenerateName(stCfg.LoaderConfig, loadIdx, "load", loadMap)
c.Loaders[loadName] = &stCfg.LoaderConfig

syncName := fmt.Sprintf("sync-%02d", i+1)
syncName, syncIdx = getGenerateName(stCfg.SyncerConfig, syncIdx, "sync", syncMap)
c.Syncers[syncName] = &stCfg.SyncerConfig

cmNames := make([]string, 0, len(stCfg.ColumnMappingRules))
for j, rule := range stCfg.ColumnMappingRules {
cmName := fmt.Sprintf("cm-%02d-%02d", i+1, j+1)
for _, rule := range stCfg.ColumnMappingRules {
cmName, cmIdx = getGenerateName(rule, cmIdx, "cm", cmMap)
cmNames = append(cmNames, cmName)
c.ColumnMappings[cmName] = rule
}
Expand Down
54 changes: 29 additions & 25 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io/ioutil"
"path"
"sort"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -311,6 +312,16 @@ func (t *testConfig) TestTaskBlockAllowList(c *C) {
c.Assert(cfg.BAList["source-1"], Equals, filterRules2)
}

func WordCount(s string) map[string]int {
words := strings.Fields(s)
wordCount := make(map[string]int)
for i := range words {
wordCount[words[i]]++
}

return wordCount
}

func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
var (
shardMode = ShardOptimistic
Expand Down Expand Up @@ -429,7 +440,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
Security: &security,
RawDBCfg: &rawDBCfg,
},
RouteRules: []*router.TableRule{&routeRule1, &routeRule2},
RouteRules: []*router.TableRule{&routeRule2, &routeRule1},
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
FilterRules: []*bf.BinlogEventRule{&filterRule1, &filterRule2},
BAList: &baList1,
MydumperConfig: MydumperConfig{
Expand Down Expand Up @@ -493,9 +504,9 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
{
SourceID: source1,
Meta: stCfg1.Meta,
FilterRules: []string{"filter-01-01", "filter-01-02"},
FilterRules: []string{"filter-01", "filter-02"},
ColumnMappingRules: []string{},
RouteRules: []string{"route-01-01", "route-01-02"},
RouteRules: []string{"route-01", "route-02"},
BWListName: "",
BAListName: "balist-01",
MydumperConfigName: "dump-01",
Expand All @@ -511,57 +522,50 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
{
SourceID: source2,
Meta: stCfg2.Meta,
FilterRules: []string{"filter-02-01", "filter-02-02"},
FilterRules: []string{"filter-01", "filter-02"},
ColumnMappingRules: []string{},
RouteRules: []string{"route-02-01", "route-02-02"},
RouteRules: []string{"route-01", "route-02"},
BWListName: "",
BAListName: "balist-02",
MydumperConfigName: "dump-02",
MydumperConfigName: "dump-01",
Mydumper: nil,
MydumperThread: 0,
LoaderConfigName: "load-02",
LoaderConfigName: "load-01",
Loader: nil,
LoaderThread: 0,
SyncerConfigName: "sync-02",
SyncerConfigName: "sync-01",
Syncer: nil,
SyncerThread: 0,
},
},
OnlineDDLScheme: onlineDDLScheme,
Routes: map[string]*router.TableRule{
"route-01-01": stCfg1.RouteRules[0],
"route-01-02": stCfg1.RouteRules[1],
"route-02-01": stCfg2.RouteRules[0],
"route-02-02": stCfg2.RouteRules[1],
"route-01": &routeRule1,
"route-02": &routeRule2,
},
Filters: map[string]*bf.BinlogEventRule{
"filter-01-01": stCfg1.FilterRules[0],
"filter-01-02": stCfg1.FilterRules[1],
"filter-02-01": stCfg2.FilterRules[0],
"filter-02-02": stCfg2.FilterRules[1],
"filter-01": &filterRule1,
"filter-02": &filterRule2,
},
ColumnMappings: nil,
BWList: nil,
BAList: map[string]*filter.Rules{
"balist-01": stCfg1.BAList,
"balist-02": stCfg2.BAList,
"balist-01": &baList1,
"balist-02": &baList2,
},
Mydumpers: map[string]*MydumperConfig{
"dump-01": &stCfg1.MydumperConfig,
"dump-02": &stCfg2.MydumperConfig,
},
Loaders: map[string]*LoaderConfig{
"load-01": &stCfg1.LoaderConfig,
"load-02": &stCfg2.LoaderConfig,
},
Syncers: map[string]*SyncerConfig{
"sync-01": &stCfg1.SyncerConfig,
"sync-02": &stCfg2.SyncerConfig,
},
CleanDumpFile: stCfg1.CleanDumpFile,
}

c.Assert(cfg.String(), Equals, cfg2.String()) // some nil/(null value) compare may not equal, so use YAML format to compare.
c.Assert(WordCount(cfg.String()), DeepEquals, WordCount(cfg2.String())) // since rules are unordered, so use WordCount to compare

c.Assert(cfg.adjust(), IsNil)
stCfgs, err := cfg.SubTaskConfigs(map[string]DBConfig{source1: source1DBCfg, source2: source2DBCfg})
Expand Down Expand Up @@ -632,19 +636,19 @@ func (t *testConfig) TestMySQLInstance(c *C) {
m.MydumperConfigName = "test"
err = m.VerifyAndAdjust()
c.Assert(terror.ErrConfigMydumperCfgConflict.Equal(err), IsTrue)
m.RemoveDuplicateCfg()
m.MydumperConfigName = ""

m.Loader = &LoaderConfig{}
m.LoaderConfigName = "test"
err = m.VerifyAndAdjust()
c.Assert(terror.ErrConfigLoaderCfgConflict.Equal(err), IsTrue)
m.RemoveDuplicateCfg()
m.Loader = nil

m.Syncer = &SyncerConfig{}
m.SyncerConfigName = "test"
err = m.VerifyAndAdjust()
c.Assert(terror.ErrConfigSyncerCfgConflict.Equal(err), IsTrue)
m.RemoveDuplicateCfg()
m.SyncerConfigName = ""

c.Assert(m.VerifyAndAdjust(), IsNil)

Expand Down
2 changes: 1 addition & 1 deletion dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewRootCmd() *cobra.Command {
master.NewOperateLeaderCmd(),
master.NewListMemberCmd(),
master.NewOperateSchemaCmd(),
master.NewGetTaskCfgCmd(),
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
master.NewGetCfgCmd(),
master.NewHandleErrorCmd(),
)
return cmd
Expand Down
49 changes: 37 additions & 12 deletions dm/ctl/master/get_task_config.go → dm/ctl/master/get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,50 @@ import (
"github.com/pingcap/dm/dm/pb"
)

// NewGetTaskCfgCmd creates a getTaskCfg command
func NewGetTaskCfgCmd() *cobra.Command {
// NewGetCfgCmd creates a getCfg command
func NewGetCfgCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "get-task-config <task-name | task-file> [--file filename]",
Short: "Gets the task configuration.",
RunE: getTaskCfgFunc,
Use: "get-config <task | master | worker | source> <name> [--file filename]",
Short: "Gets the configuration.",
RunE: getCfgFunc,
}
cmd.Flags().StringP("file", "f", "", "write config to file")
return cmd
}

// getTaskCfgFunc does get task's config
func getTaskCfgFunc(cmd *cobra.Command, _ []string) (err error) {
if len(cmd.Flags().Args()) != 1 {
func convertCfgType(t string) pb.CfgType {
switch t {
case "task":
return pb.CfgType_TaskType
case "master":
return pb.CfgType_MasterType
case "worker":
return pb.CfgType_WorkerType
case "source":
return pb.CfgType_SourceType
default:
return pb.CfgType_InvalidType
}
}

// getCfgFunc gets config
func getCfgFunc(cmd *cobra.Command, _ []string) (err error) {
if len(cmd.Flags().Args()) != 2 {
cmd.SetOut(os.Stdout)
common.PrintCmdUsage(cmd)
err = errors.New("please check output to see error")
return
}
taskName := common.GetTaskNameFromArgOrFile(cmd.Flags().Arg(0))

cfgType := cmd.Flags().Arg(0)
tp := convertCfgType(cfgType)
if tp == pb.CfgType_InvalidType {
common.PrintLines("invalid config type '%s'", cfgType)
err = errors.New("please check output to see error")
return
}

cfgName := cmd.Flags().Arg(1)
filename, err := cmd.Flags().GetString("file")
if err != nil {
common.PrintLines("can not get filename")
Expand All @@ -56,11 +80,12 @@ func getTaskCfgFunc(cmd *cobra.Command, _ []string) (err error) {
ctx, cancel := context.WithTimeout(context.Background(), common.GlobalConfig().RPCTimeout)
defer cancel()

resp, err := cli.GetTaskCfg(ctx, &pb.GetTaskCfgRequest{
Name: taskName,
resp, err := cli.GetCfg(ctx, &pb.GetCfgRequest{
Type: tp,
Name: cfgName,
})
if err != nil {
common.PrintLines("can not get config of task %s", taskName)
common.PrintLines("can not get %s config of %s", cfgType, cfgName)
return
}

Expand Down
Loading