Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

feat: add get-config command (#1323) #1348

Merged
merged 2 commits into from
Dec 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (

"github.com/chaos-mesh/go-sqlsmith"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

config2 "github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/conn"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

const (
Expand Down Expand Up @@ -356,6 +358,10 @@ func (t *task) genIncrData(ctx context.Context) (err error) {
i2 := i
eg.Go(func() error {
if err2 := conn2.execSQLs(ctx, query); err2 != nil {
if utils.IsMySQLError(err2, mysql.ErrDupFieldName) {
t.logger.Warn("ignore duplicate field name for ddl", log.ShortError(err))
return nil
}
return err2
}
t.results[i2].DDL++
Expand Down
20 changes: 20 additions & 0 deletions cmd/dm-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func main() {
os.Exit(0)
}

args = aliasArgs(args)

// now, we use checker in dmctl while it using some pkg which log some thing when running
// to make dmctl output more clear, simply redirect log to file rather output to stdout
err := log.InitLogger(&log.Config{
Expand Down Expand Up @@ -229,6 +231,7 @@ func loop() {
}

args := strings.Fields(line)
args = aliasArgs(args)
err = ctl.Start(args)
if err != nil {
fmt.Println("fail to run:", args)
Expand All @@ -240,3 +243,20 @@ func loop() {
}
}
}

func aliasArgs(args []string) []string {
args = aliasGetTaskCfgCmd(args)
return args
}

func aliasGetTaskCfgCmd(args []string) []string {
for i, arg := range args {
if arg == "get-task-config" {
args = append(args[:i+1], args[i:]...)
args[i] = "get-config"
args[i+1] = "task"
return args
}
}
return args
}
73 changes: 44 additions & 29 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,6 @@ func (m *MySQLInstance) VerifyAndAdjust() error {
return nil
}

// RemoveDuplicateCfg remove duplicate mydumper, loader, and syncer config
func (m *MySQLInstance) RemoveDuplicateCfg() {
if len(m.MydumperConfigName) > 0 && m.Mydumper != nil {
m.Mydumper = nil
}
if len(m.LoaderConfigName) > 0 && m.Loader != nil {
m.Loader = nil
}
if len(m.SyncerConfigName) > 0 && m.Syncer != nil {
m.Syncer = nil
}
}

// MydumperConfig represents mydumper process unit's specific config
type MydumperConfig struct {
MydumperPath string `yaml:"mydumper-path" toml:"mydumper-path" json:"mydumper-path"` // mydumper binary path
Expand Down Expand Up @@ -628,9 +615,27 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
return cfgs, nil
}

// getGenerateName generates name by rule or gets name from nameMap
// if it's a new name, increase nameIdx
// otherwise return current nameIdx
func getGenerateName(rule interface{}, nameIdx int, namePrefix string, nameMap map[string]string) (string, int) {
// use json as key since no DeepEqual for rules now.
ruleByte, err := json.Marshal(rule)
if err != nil {
log.L().Error(fmt.Sprintf("marshal %s rule to json", namePrefix), log.ShortError(err))
return fmt.Sprintf("%s-%02d", namePrefix, nameIdx), nameIdx + 1
} else if val, ok := nameMap[string(ruleByte)]; ok {
return val, nameIdx
} else {
ruleName := fmt.Sprintf("%s-%02d", namePrefix, nameIdx+1)
nameMap[string(ruleByte)] = ruleName
return ruleName, nameIdx + 1
}
}

// FromSubTaskConfigs constructs task configs from a list of valid subtask configs.
// this method is only used to construct config when importing from v1.0.x now.
func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) {
func FromSubTaskConfigs(stCfgs ...*SubTaskConfig) *TaskConfig {
c := &TaskConfig{}
// global configs.
stCfg0 := stCfgs[0]
c.Name = stCfg0.Name
Expand All @@ -656,39 +661,48 @@ func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) {
c.Loaders = make(map[string]*LoaderConfig)
c.Syncers = make(map[string]*SyncerConfig)

BAListMap := make(map[string]string, len(stCfgs))
routeMap := make(map[string]string, len(stCfgs))
filterMap := make(map[string]string, len(stCfgs))
dumpMap := make(map[string]string, len(stCfgs))
loadMap := make(map[string]string, len(stCfgs))
syncMap := make(map[string]string, len(stCfgs))
cmMap := make(map[string]string, len(stCfgs))
var baListIdx, routeIdx, filterIdx, dumpIdx, loadIdx, syncIdx, cmIdx int
var baListName, routeName, filterName, dumpName, loadName, syncName, cmName string

// NOTE:
// - we choose to ref global configs for instances now.
// - no DeepEqual for rules now, so not combine REAL same rule into only one.
for i, stCfg := range stCfgs {
BAListName := fmt.Sprintf("balist-%02d", i+1)
c.BAList[BAListName] = stCfg.BAList
for _, stCfg := range stCfgs {
baListName, baListIdx = getGenerateName(stCfg.BAList, baListIdx, "balist", BAListMap)
c.BAList[baListName] = stCfg.BAList

routeNames := make([]string, 0, len(stCfg.RouteRules))
for j, rule := range stCfg.RouteRules {
routeName := fmt.Sprintf("route-%02d-%02d", i+1, j+1)
for _, rule := range stCfg.RouteRules {
routeName, routeIdx = getGenerateName(rule, routeIdx, "route", routeMap)
routeNames = append(routeNames, routeName)
c.Routes[routeName] = rule
}

filterNames := make([]string, 0, len(stCfg.FilterRules))
for j, rule := range stCfg.FilterRules {
filterName := fmt.Sprintf("filter-%02d-%02d", i+1, j+1)
for _, rule := range stCfg.FilterRules {
filterName, filterIdx = getGenerateName(rule, filterIdx, "filter", filterMap)
filterNames = append(filterNames, filterName)
c.Filters[filterName] = rule
}

dumpName := fmt.Sprintf("dump-%02d", i+1)
dumpName, dumpIdx = getGenerateName(stCfg.MydumperConfig, dumpIdx, "dump", dumpMap)
c.Mydumpers[dumpName] = &stCfg.MydumperConfig

loadName := fmt.Sprintf("load-%02d", i+1)
loadName, loadIdx = getGenerateName(stCfg.LoaderConfig, loadIdx, "load", loadMap)
c.Loaders[loadName] = &stCfg.LoaderConfig

syncName := fmt.Sprintf("sync-%02d", i+1)
syncName, syncIdx = getGenerateName(stCfg.SyncerConfig, syncIdx, "sync", syncMap)
c.Syncers[syncName] = &stCfg.SyncerConfig

cmNames := make([]string, 0, len(stCfg.ColumnMappingRules))
for j, rule := range stCfg.ColumnMappingRules {
cmName := fmt.Sprintf("cm-%02d-%02d", i+1, j+1)
for _, rule := range stCfg.ColumnMappingRules {
cmName, cmIdx = getGenerateName(rule, cmIdx, "cm", cmMap)
cmNames = append(cmNames, cmName)
c.ColumnMappings[cmName] = rule
}
Expand All @@ -699,12 +713,13 @@ func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) {
FilterRules: filterNames,
ColumnMappingRules: cmNames,
RouteRules: routeNames,
BAListName: BAListName,
BAListName: baListName,
MydumperConfigName: dumpName,
LoaderConfigName: loadName,
SyncerConfigName: syncName,
})
}
return c
}

// checkDuplicateString checks whether the given string array has duplicate string item
Expand Down
71 changes: 44 additions & 27 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io/ioutil"
"path"
"sort"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -311,6 +312,16 @@ func (t *testConfig) TestTaskBlockAllowList(c *C) {
c.Assert(cfg.BAList["source-1"], Equals, filterRules2)
}

func WordCount(s string) map[string]int {
words := strings.Fields(s)
wordCount := make(map[string]int)
for i := range words {
wordCount[words[i]]++
}

return wordCount
}

func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
var (
shardMode = ShardOptimistic
Expand Down Expand Up @@ -352,6 +363,17 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
TargetSchema: "db",
TargetTable: "tbl",
}
routeRule3 = router.TableRule{
SchemaPattern: "database*",
TablePattern: "table*",
}
routeRule4 = router.TableRule{
SchemaPattern: "schema*",
TablePattern: "tbs*",
TargetSchema: "schema",
TargetTable: "tbs",
}

filterRule1 = bf.BinlogEventRule{
SchemaPattern: "db*",
TablePattern: "tbl1*",
Expand Down Expand Up @@ -429,7 +451,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
Security: &security,
RawDBCfg: &rawDBCfg,
},
RouteRules: []*router.TableRule{&routeRule1, &routeRule2},
RouteRules: []*router.TableRule{&routeRule2, &routeRule1, &routeRule3},
FilterRules: []*bf.BinlogEventRule{&filterRule1, &filterRule2},
BAList: &baList1,
MydumperConfig: MydumperConfig{
Expand Down Expand Up @@ -472,9 +494,9 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
}
stCfg2.From = source2DBCfg
stCfg2.BAList = &baList2
stCfg2.RouteRules = []*router.TableRule{&routeRule4, &routeRule1, &routeRule2}

var cfg TaskConfig
cfg.FromSubTaskConfigs(stCfg1, stCfg2)
cfg := FromSubTaskConfigs(stCfg1, stCfg2)

cfg2 := TaskConfig{
Name: name,
Expand All @@ -493,9 +515,9 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
{
SourceID: source1,
Meta: stCfg1.Meta,
FilterRules: []string{"filter-01-01", "filter-01-02"},
FilterRules: []string{"filter-01", "filter-02"},
ColumnMappingRules: []string{},
RouteRules: []string{"route-01-01", "route-01-02"},
RouteRules: []string{"route-01", "route-02", "route-03"},
BWListName: "",
BAListName: "balist-01",
MydumperConfigName: "dump-01",
Expand All @@ -511,57 +533,52 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
{
SourceID: source2,
Meta: stCfg2.Meta,
FilterRules: []string{"filter-02-01", "filter-02-02"},
FilterRules: []string{"filter-01", "filter-02"},
ColumnMappingRules: []string{},
RouteRules: []string{"route-02-01", "route-02-02"},
RouteRules: []string{"route-01", "route-02", "route-04"},
BWListName: "",
BAListName: "balist-02",
MydumperConfigName: "dump-02",
MydumperConfigName: "dump-01",
Mydumper: nil,
MydumperThread: 0,
LoaderConfigName: "load-02",
LoaderConfigName: "load-01",
Loader: nil,
LoaderThread: 0,
SyncerConfigName: "sync-02",
SyncerConfigName: "sync-01",
Syncer: nil,
SyncerThread: 0,
},
},
OnlineDDLScheme: onlineDDLScheme,
Routes: map[string]*router.TableRule{
"route-01-01": stCfg1.RouteRules[0],
"route-01-02": stCfg1.RouteRules[1],
"route-02-01": stCfg2.RouteRules[0],
"route-02-02": stCfg2.RouteRules[1],
"route-01": &routeRule1,
"route-02": &routeRule2,
"route-03": &routeRule3,
"route-04": &routeRule4,
},
Filters: map[string]*bf.BinlogEventRule{
"filter-01-01": stCfg1.FilterRules[0],
"filter-01-02": stCfg1.FilterRules[1],
"filter-02-01": stCfg2.FilterRules[0],
"filter-02-02": stCfg2.FilterRules[1],
"filter-01": &filterRule1,
"filter-02": &filterRule2,
},
ColumnMappings: nil,
BWList: nil,
BAList: map[string]*filter.Rules{
"balist-01": stCfg1.BAList,
"balist-02": stCfg2.BAList,
"balist-01": &baList1,
"balist-02": &baList2,
},
Mydumpers: map[string]*MydumperConfig{
"dump-01": &stCfg1.MydumperConfig,
"dump-02": &stCfg2.MydumperConfig,
},
Loaders: map[string]*LoaderConfig{
"load-01": &stCfg1.LoaderConfig,
"load-02": &stCfg2.LoaderConfig,
},
Syncers: map[string]*SyncerConfig{
"sync-01": &stCfg1.SyncerConfig,
"sync-02": &stCfg2.SyncerConfig,
},
CleanDumpFile: stCfg1.CleanDumpFile,
}

c.Assert(cfg.String(), Equals, cfg2.String()) // some nil/(null value) compare may not equal, so use YAML format to compare.
c.Assert(WordCount(cfg.String()), DeepEquals, WordCount(cfg2.String())) // since rules are unordered, so use WordCount to compare

c.Assert(cfg.adjust(), IsNil)
stCfgs, err := cfg.SubTaskConfigs(map[string]DBConfig{source1: source1DBCfg, source2: source2DBCfg})
Expand Down Expand Up @@ -632,19 +649,19 @@ func (t *testConfig) TestMySQLInstance(c *C) {
m.MydumperConfigName = "test"
err = m.VerifyAndAdjust()
c.Assert(terror.ErrConfigMydumperCfgConflict.Equal(err), IsTrue)
m.RemoveDuplicateCfg()
m.MydumperConfigName = ""

m.Loader = &LoaderConfig{}
m.LoaderConfigName = "test"
err = m.VerifyAndAdjust()
c.Assert(terror.ErrConfigLoaderCfgConflict.Equal(err), IsTrue)
m.RemoveDuplicateCfg()
m.Loader = nil

m.Syncer = &SyncerConfig{}
m.SyncerConfigName = "test"
err = m.VerifyAndAdjust()
c.Assert(terror.ErrConfigSyncerCfgConflict.Equal(err), IsTrue)
m.RemoveDuplicateCfg()
m.SyncerConfigName = ""

c.Assert(m.VerifyAndAdjust(), IsNil)

Expand Down
2 changes: 1 addition & 1 deletion dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewRootCmd() *cobra.Command {
master.NewOperateLeaderCmd(),
master.NewListMemberCmd(),
master.NewOperateSchemaCmd(),
master.NewGetTaskCfgCmd(),
master.NewGetCfgCmd(),
master.NewHandleErrorCmd(),
)
return cmd
Expand Down
Loading