Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
feat: add get-config command (#1323) (#1348)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

Co-authored-by: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com>
Co-authored-by: gmhdbjd <gmhdbjd@gmail.com>
  • Loading branch information
3 people authored Dec 21, 2020
1 parent 5dd474f commit 3754a46
Show file tree
Hide file tree
Showing 41 changed files with 1,971 additions and 911 deletions.
6 changes: 6 additions & 0 deletions chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (

"github.com/chaos-mesh/go-sqlsmith"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

config2 "github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/conn"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

const (
Expand Down Expand Up @@ -356,6 +358,10 @@ func (t *task) genIncrData(ctx context.Context) (err error) {
i2 := i
eg.Go(func() error {
if err2 := conn2.execSQLs(ctx, query); err2 != nil {
if utils.IsMySQLError(err2, mysql.ErrDupFieldName) {
t.logger.Warn("ignore duplicate field name for ddl", log.ShortError(err))
return nil
}
return err2
}
t.results[i2].DDL++
Expand Down
20 changes: 20 additions & 0 deletions cmd/dm-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func main() {
os.Exit(0)
}

args = aliasArgs(args)

// now, we use checker in dmctl while it using some pkg which log some thing when running
// to make dmctl output more clear, simply redirect log to file rather output to stdout
err := log.InitLogger(&log.Config{
Expand Down Expand Up @@ -229,6 +231,7 @@ func loop() {
}

args := strings.Fields(line)
args = aliasArgs(args)
err = ctl.Start(args)
if err != nil {
fmt.Println("fail to run:", args)
Expand All @@ -240,3 +243,20 @@ func loop() {
}
}
}

func aliasArgs(args []string) []string {
args = aliasGetTaskCfgCmd(args)
return args
}

func aliasGetTaskCfgCmd(args []string) []string {
for i, arg := range args {
if arg == "get-task-config" {
args = append(args[:i+1], args[i:]...)
args[i] = "get-config"
args[i+1] = "task"
return args
}
}
return args
}
73 changes: 44 additions & 29 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,19 +160,6 @@ func (m *MySQLInstance) VerifyAndAdjust() error {
return nil
}

// RemoveDuplicateCfg remove duplicate mydumper, loader, and syncer config
func (m *MySQLInstance) RemoveDuplicateCfg() {
if len(m.MydumperConfigName) > 0 && m.Mydumper != nil {
m.Mydumper = nil
}
if len(m.LoaderConfigName) > 0 && m.Loader != nil {
m.Loader = nil
}
if len(m.SyncerConfigName) > 0 && m.Syncer != nil {
m.Syncer = nil
}
}

// MydumperConfig represents mydumper process unit's specific config
type MydumperConfig struct {
MydumperPath string `yaml:"mydumper-path" toml:"mydumper-path" json:"mydumper-path"` // mydumper binary path
Expand Down Expand Up @@ -628,9 +615,27 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
return cfgs, nil
}

// getGenerateName generates name by rule or gets name from nameMap
// if it's a new name, increase nameIdx
// otherwise return current nameIdx
func getGenerateName(rule interface{}, nameIdx int, namePrefix string, nameMap map[string]string) (string, int) {
// use json as key since no DeepEqual for rules now.
ruleByte, err := json.Marshal(rule)
if err != nil {
log.L().Error(fmt.Sprintf("marshal %s rule to json", namePrefix), log.ShortError(err))
return fmt.Sprintf("%s-%02d", namePrefix, nameIdx), nameIdx + 1
} else if val, ok := nameMap[string(ruleByte)]; ok {
return val, nameIdx
} else {
ruleName := fmt.Sprintf("%s-%02d", namePrefix, nameIdx+1)
nameMap[string(ruleByte)] = ruleName
return ruleName, nameIdx + 1
}
}

// FromSubTaskConfigs constructs task configs from a list of valid subtask configs.
// this method is only used to construct config when importing from v1.0.x now.
func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) {
func FromSubTaskConfigs(stCfgs ...*SubTaskConfig) *TaskConfig {
c := &TaskConfig{}
// global configs.
stCfg0 := stCfgs[0]
c.Name = stCfg0.Name
Expand All @@ -656,39 +661,48 @@ func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) {
c.Loaders = make(map[string]*LoaderConfig)
c.Syncers = make(map[string]*SyncerConfig)

BAListMap := make(map[string]string, len(stCfgs))
routeMap := make(map[string]string, len(stCfgs))
filterMap := make(map[string]string, len(stCfgs))
dumpMap := make(map[string]string, len(stCfgs))
loadMap := make(map[string]string, len(stCfgs))
syncMap := make(map[string]string, len(stCfgs))
cmMap := make(map[string]string, len(stCfgs))
var baListIdx, routeIdx, filterIdx, dumpIdx, loadIdx, syncIdx, cmIdx int
var baListName, routeName, filterName, dumpName, loadName, syncName, cmName string

// NOTE:
// - we choose to ref global configs for instances now.
// - no DeepEqual for rules now, so not combine REAL same rule into only one.
for i, stCfg := range stCfgs {
BAListName := fmt.Sprintf("balist-%02d", i+1)
c.BAList[BAListName] = stCfg.BAList
for _, stCfg := range stCfgs {
baListName, baListIdx = getGenerateName(stCfg.BAList, baListIdx, "balist", BAListMap)
c.BAList[baListName] = stCfg.BAList

routeNames := make([]string, 0, len(stCfg.RouteRules))
for j, rule := range stCfg.RouteRules {
routeName := fmt.Sprintf("route-%02d-%02d", i+1, j+1)
for _, rule := range stCfg.RouteRules {
routeName, routeIdx = getGenerateName(rule, routeIdx, "route", routeMap)
routeNames = append(routeNames, routeName)
c.Routes[routeName] = rule
}

filterNames := make([]string, 0, len(stCfg.FilterRules))
for j, rule := range stCfg.FilterRules {
filterName := fmt.Sprintf("filter-%02d-%02d", i+1, j+1)
for _, rule := range stCfg.FilterRules {
filterName, filterIdx = getGenerateName(rule, filterIdx, "filter", filterMap)
filterNames = append(filterNames, filterName)
c.Filters[filterName] = rule
}

dumpName := fmt.Sprintf("dump-%02d", i+1)
dumpName, dumpIdx = getGenerateName(stCfg.MydumperConfig, dumpIdx, "dump", dumpMap)
c.Mydumpers[dumpName] = &stCfg.MydumperConfig

loadName := fmt.Sprintf("load-%02d", i+1)
loadName, loadIdx = getGenerateName(stCfg.LoaderConfig, loadIdx, "load", loadMap)
c.Loaders[loadName] = &stCfg.LoaderConfig

syncName := fmt.Sprintf("sync-%02d", i+1)
syncName, syncIdx = getGenerateName(stCfg.SyncerConfig, syncIdx, "sync", syncMap)
c.Syncers[syncName] = &stCfg.SyncerConfig

cmNames := make([]string, 0, len(stCfg.ColumnMappingRules))
for j, rule := range stCfg.ColumnMappingRules {
cmName := fmt.Sprintf("cm-%02d-%02d", i+1, j+1)
for _, rule := range stCfg.ColumnMappingRules {
cmName, cmIdx = getGenerateName(rule, cmIdx, "cm", cmMap)
cmNames = append(cmNames, cmName)
c.ColumnMappings[cmName] = rule
}
Expand All @@ -699,12 +713,13 @@ func (c *TaskConfig) FromSubTaskConfigs(stCfgs ...*SubTaskConfig) {
FilterRules: filterNames,
ColumnMappingRules: cmNames,
RouteRules: routeNames,
BAListName: BAListName,
BAListName: baListName,
MydumperConfigName: dumpName,
LoaderConfigName: loadName,
SyncerConfigName: syncName,
})
}
return c
}

// checkDuplicateString checks whether the given string array has duplicate string item
Expand Down
71 changes: 44 additions & 27 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io/ioutil"
"path"
"sort"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/dm/pkg/terror"
Expand Down Expand Up @@ -311,6 +312,16 @@ func (t *testConfig) TestTaskBlockAllowList(c *C) {
c.Assert(cfg.BAList["source-1"], Equals, filterRules2)
}

func WordCount(s string) map[string]int {
words := strings.Fields(s)
wordCount := make(map[string]int)
for i := range words {
wordCount[words[i]]++
}

return wordCount
}

func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
var (
shardMode = ShardOptimistic
Expand Down Expand Up @@ -352,6 +363,17 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
TargetSchema: "db",
TargetTable: "tbl",
}
routeRule3 = router.TableRule{
SchemaPattern: "database*",
TablePattern: "table*",
}
routeRule4 = router.TableRule{
SchemaPattern: "schema*",
TablePattern: "tbs*",
TargetSchema: "schema",
TargetTable: "tbs",
}

filterRule1 = bf.BinlogEventRule{
SchemaPattern: "db*",
TablePattern: "tbl1*",
Expand Down Expand Up @@ -429,7 +451,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
Security: &security,
RawDBCfg: &rawDBCfg,
},
RouteRules: []*router.TableRule{&routeRule1, &routeRule2},
RouteRules: []*router.TableRule{&routeRule2, &routeRule1, &routeRule3},
FilterRules: []*bf.BinlogEventRule{&filterRule1, &filterRule2},
BAList: &baList1,
MydumperConfig: MydumperConfig{
Expand Down Expand Up @@ -472,9 +494,9 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
}
stCfg2.From = source2DBCfg
stCfg2.BAList = &baList2
stCfg2.RouteRules = []*router.TableRule{&routeRule4, &routeRule1, &routeRule2}

var cfg TaskConfig
cfg.FromSubTaskConfigs(stCfg1, stCfg2)
cfg := FromSubTaskConfigs(stCfg1, stCfg2)

cfg2 := TaskConfig{
Name: name,
Expand All @@ -493,9 +515,9 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
{
SourceID: source1,
Meta: stCfg1.Meta,
FilterRules: []string{"filter-01-01", "filter-01-02"},
FilterRules: []string{"filter-01", "filter-02"},
ColumnMappingRules: []string{},
RouteRules: []string{"route-01-01", "route-01-02"},
RouteRules: []string{"route-01", "route-02", "route-03"},
BWListName: "",
BAListName: "balist-01",
MydumperConfigName: "dump-01",
Expand All @@ -511,57 +533,52 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
{
SourceID: source2,
Meta: stCfg2.Meta,
FilterRules: []string{"filter-02-01", "filter-02-02"},
FilterRules: []string{"filter-01", "filter-02"},
ColumnMappingRules: []string{},
RouteRules: []string{"route-02-01", "route-02-02"},
RouteRules: []string{"route-01", "route-02", "route-04"},
BWListName: "",
BAListName: "balist-02",
MydumperConfigName: "dump-02",
MydumperConfigName: "dump-01",
Mydumper: nil,
MydumperThread: 0,
LoaderConfigName: "load-02",
LoaderConfigName: "load-01",
Loader: nil,
LoaderThread: 0,
SyncerConfigName: "sync-02",
SyncerConfigName: "sync-01",
Syncer: nil,
SyncerThread: 0,
},
},
OnlineDDLScheme: onlineDDLScheme,
Routes: map[string]*router.TableRule{
"route-01-01": stCfg1.RouteRules[0],
"route-01-02": stCfg1.RouteRules[1],
"route-02-01": stCfg2.RouteRules[0],
"route-02-02": stCfg2.RouteRules[1],
"route-01": &routeRule1,
"route-02": &routeRule2,
"route-03": &routeRule3,
"route-04": &routeRule4,
},
Filters: map[string]*bf.BinlogEventRule{
"filter-01-01": stCfg1.FilterRules[0],
"filter-01-02": stCfg1.FilterRules[1],
"filter-02-01": stCfg2.FilterRules[0],
"filter-02-02": stCfg2.FilterRules[1],
"filter-01": &filterRule1,
"filter-02": &filterRule2,
},
ColumnMappings: nil,
BWList: nil,
BAList: map[string]*filter.Rules{
"balist-01": stCfg1.BAList,
"balist-02": stCfg2.BAList,
"balist-01": &baList1,
"balist-02": &baList2,
},
Mydumpers: map[string]*MydumperConfig{
"dump-01": &stCfg1.MydumperConfig,
"dump-02": &stCfg2.MydumperConfig,
},
Loaders: map[string]*LoaderConfig{
"load-01": &stCfg1.LoaderConfig,
"load-02": &stCfg2.LoaderConfig,
},
Syncers: map[string]*SyncerConfig{
"sync-01": &stCfg1.SyncerConfig,
"sync-02": &stCfg2.SyncerConfig,
},
CleanDumpFile: stCfg1.CleanDumpFile,
}

c.Assert(cfg.String(), Equals, cfg2.String()) // some nil/(null value) compare may not equal, so use YAML format to compare.
c.Assert(WordCount(cfg.String()), DeepEquals, WordCount(cfg2.String())) // since rules are unordered, so use WordCount to compare

c.Assert(cfg.adjust(), IsNil)
stCfgs, err := cfg.SubTaskConfigs(map[string]DBConfig{source1: source1DBCfg, source2: source2DBCfg})
Expand Down Expand Up @@ -632,19 +649,19 @@ func (t *testConfig) TestMySQLInstance(c *C) {
m.MydumperConfigName = "test"
err = m.VerifyAndAdjust()
c.Assert(terror.ErrConfigMydumperCfgConflict.Equal(err), IsTrue)
m.RemoveDuplicateCfg()
m.MydumperConfigName = ""

m.Loader = &LoaderConfig{}
m.LoaderConfigName = "test"
err = m.VerifyAndAdjust()
c.Assert(terror.ErrConfigLoaderCfgConflict.Equal(err), IsTrue)
m.RemoveDuplicateCfg()
m.Loader = nil

m.Syncer = &SyncerConfig{}
m.SyncerConfigName = "test"
err = m.VerifyAndAdjust()
c.Assert(terror.ErrConfigSyncerCfgConflict.Equal(err), IsTrue)
m.RemoveDuplicateCfg()
m.SyncerConfigName = ""

c.Assert(m.VerifyAndAdjust(), IsNil)

Expand Down
2 changes: 1 addition & 1 deletion dm/ctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewRootCmd() *cobra.Command {
master.NewOperateLeaderCmd(),
master.NewListMemberCmd(),
master.NewOperateSchemaCmd(),
master.NewGetTaskCfgCmd(),
master.NewGetCfgCmd(),
master.NewHandleErrorCmd(),
)
return cmd
Expand Down
Loading

0 comments on commit 3754a46

Please sign in to comment.