diff --git a/dm/config/source_config.go b/dm/config/source_config.go index db70e5ee83..e9063c8bed 100644 --- a/dm/config/source_config.go +++ b/dm/config/source_config.go @@ -45,7 +45,7 @@ type PurgeConfig struct { RemainSpace int64 `yaml:"remain-space" toml:"remain-space" json:"remain-space"` // if remain space in @RelayBaseDir less than @RemainSpace (GB), then it can be purged } -// SourceConfig is the configuration for Worker. +// SourceConfig is the configuration for source. type SourceConfig struct { EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"` AutoFixGTID bool `yaml:"auto-fix-gtid" toml:"auto-fix-gtid" json:"auto-fix-gtid"` @@ -353,3 +353,86 @@ func (c *SourceConfig) check(metaData *toml.MetaData, err error) error { c.adjust() return nil } + +// YamlForDowngrade returns YAML format represents of config for downgrade. +func (c *SourceConfig) YamlForDowngrade() (string, error) { + s := NewSourceConfigForDowngrade(c) + + // encrypt password + cipher, err := utils.Encrypt(utils.DecryptOrPlaintext(c.From.Password)) + if err != nil { + return "", err + } + s.From.Password = cipher + + // omit default values, so we can ignore them for later marshal + s.omitDefaultVals() + + return s.Yaml() +} + +// SourceConfigForDowngrade is the base configuration for source in v2.0. +// This config is used for downgrade(config export) from a higher dmctl version. +// When we add any new config item into SourceConfig, we should update it also. +type SourceConfigForDowngrade struct { + EnableGTID bool `yaml:"enable-gtid"` + AutoFixGTID bool `yaml:"auto-fix-gtid"` + RelayDir string `yaml:"relay-dir"` + MetaDir string `yaml:"meta-dir"` + Flavor string `yaml:"flavor"` + Charset string `yaml:"charset"` + EnableRelay bool `yaml:"enable-relay"` + RelayBinLogName string `yaml:"relay-binlog-name"` + RelayBinlogGTID string `yaml:"relay-binlog-gtid"` + UUIDSuffix int `yaml:"-"` + SourceID string `yaml:"source-id"` + From DBConfig `yaml:"from"` + Purge PurgeConfig `yaml:"purge"` + Checker CheckerConfig `yaml:"checker"` + ServerID uint32 `yaml:"server-id"` + Tracer map[string]interface{} `yaml:"tracer"` + // any new config item, we mark it omitempty + CaseSensitive bool `yaml:"case-sensitive,omitempty"` + Filters []*bf.BinlogEventRule `yaml:"filters,omitempty"` +} + +// NewSourceConfigForDowngrade creates a new base config for downgrade. +func NewSourceConfigForDowngrade(sourceCfg *SourceConfig) *SourceConfigForDowngrade { + return &SourceConfigForDowngrade{ + EnableGTID: sourceCfg.EnableGTID, + AutoFixGTID: sourceCfg.AutoFixGTID, + RelayDir: sourceCfg.RelayDir, + MetaDir: sourceCfg.MetaDir, + Flavor: sourceCfg.Flavor, + Charset: sourceCfg.Charset, + EnableRelay: sourceCfg.EnableRelay, + RelayBinLogName: sourceCfg.RelayBinLogName, + RelayBinlogGTID: sourceCfg.RelayBinlogGTID, + UUIDSuffix: sourceCfg.UUIDSuffix, + SourceID: sourceCfg.SourceID, + From: sourceCfg.From, + Purge: sourceCfg.Purge, + Checker: sourceCfg.Checker, + ServerID: sourceCfg.ServerID, + Tracer: sourceCfg.Tracer, + CaseSensitive: sourceCfg.CaseSensitive, + Filters: sourceCfg.Filters, + } +} + +// omitDefaultVals change default value to empty value for new config item. +// If any default value for new config item is not empty(0 or false or nil), +// we should change it to empty. +func (c *SourceConfigForDowngrade) omitDefaultVals() { + if len(c.From.Session) > 0 { + if timeZone, ok := c.From.Session["time_zone"]; ok && timeZone == defaultTimeZone { + delete(c.From.Session, "time_zone") + } + } +} + +// Yaml returns YAML format representation of the config. +func (c *SourceConfigForDowngrade) Yaml() (string, error) { + b, err := yaml.Marshal(c) + return string(b), err +} diff --git a/dm/config/source_config_test.go b/dm/config/source_config_test.go index 3aaddb3653..db9045b493 100644 --- a/dm/config/source_config_test.go +++ b/dm/config/source_config_test.go @@ -19,6 +19,7 @@ import ( "fmt" "io/ioutil" "path" + "reflect" "strings" "time" @@ -235,6 +236,22 @@ func (t *testConfig) TestConfigVerify(c *C) { } } +func (t *testConfig) TestSourceConfigForDowngrade(c *C) { + cfg, err := LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) + + // make sure all new field were added + cfgForDowngrade := NewSourceConfigForDowngrade(cfg) + cfgReflect := reflect.Indirect(reflect.ValueOf(cfg)) + cfgForDowngradeReflect := reflect.Indirect(reflect.ValueOf(cfgForDowngrade)) + c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()) + + // make sure all field were copied + cfgForClone := &SourceConfigForDowngrade{} + Clone(cfgForClone, cfg) + c.Assert(cfgForDowngrade, DeepEquals, cfgForClone) +} + func subtestFlavor(c *C, cfg *SourceConfig, sqlInfo, expectedFlavor, expectedError string) { cfg.Flavor = "" db, mock, err := sqlmock.New() diff --git a/dm/config/task.go b/dm/config/task.go index 170d789723..e1246ad500 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) // Online DDL Scheme. @@ -935,3 +936,154 @@ func checkValidExpr(expr string) error { _, _, err := defaultParser.Parse(expr, "", "") return err } + +// YamlForDowngrade returns YAML format represents of config for downgrade. +func (c *TaskConfig) YamlForDowngrade() (string, error) { + t := NewTaskConfigForDowngrade(c) + + // encrypt password + cipher, err := utils.Encrypt(utils.DecryptOrPlaintext(t.TargetDB.Password)) + if err != nil { + return "", err + } + t.TargetDB.Password = cipher + + // omit default values, so we can ignore them for later marshal + t.omitDefaultVals() + + return t.Yaml() +} + +// MySQLInstanceForDowngrade represents a sync config of a MySQL instance for downgrade. +type MySQLInstanceForDowngrade struct { + SourceID string `yaml:"source-id"` + Meta *Meta `yaml:"meta"` + FilterRules []string `yaml:"filter-rules"` + ColumnMappingRules []string `yaml:"column-mapping-rules"` + RouteRules []string `yaml:"route-rules"` + BWListName string `yaml:"black-white-list"` + BAListName string `yaml:"block-allow-list"` + MydumperConfigName string `yaml:"mydumper-config-name"` + Mydumper *MydumperConfig `yaml:"mydumper"` + MydumperThread int `yaml:"mydumper-thread"` + LoaderConfigName string `yaml:"loader-config-name"` + Loader *LoaderConfig `yaml:"loader"` + LoaderThread int `yaml:"loader-thread"` + SyncerConfigName string `yaml:"syncer-config-name"` + Syncer *SyncerConfig `yaml:"syncer"` + SyncerThread int `yaml:"syncer-thread"` + // new config item + ExpressionFilters []string `yaml:"expression-filters,omitempty"` +} + +// NewMySQLInstancesForDowngrade creates []* MySQLInstanceForDowngrade. +func NewMySQLInstancesForDowngrade(mysqlInstances []*MySQLInstance) []*MySQLInstanceForDowngrade { + mysqlInstancesForDowngrade := make([]*MySQLInstanceForDowngrade, 0, len(mysqlInstances)) + for _, m := range mysqlInstances { + newMySQLInstance := &MySQLInstanceForDowngrade{ + SourceID: m.SourceID, + Meta: m.Meta, + FilterRules: m.FilterRules, + ColumnMappingRules: m.ColumnMappingRules, + RouteRules: m.RouteRules, + BWListName: m.BWListName, + BAListName: m.BAListName, + MydumperConfigName: m.MydumperConfigName, + Mydumper: m.Mydumper, + MydumperThread: m.MydumperThread, + LoaderConfigName: m.LoaderConfigName, + Loader: m.Loader, + LoaderThread: m.LoaderThread, + SyncerConfigName: m.SyncerConfigName, + Syncer: m.Syncer, + SyncerThread: m.SyncerThread, + ExpressionFilters: m.ExpressionFilters, + } + mysqlInstancesForDowngrade = append(mysqlInstancesForDowngrade, newMySQLInstance) + } + return mysqlInstancesForDowngrade +} + +// TaskConfigForDowngrade is the base configuration for task in v2.0. +// This config is used for downgrade(config export) from a higher dmctl version. +// When we add any new config item into SourceConfig, we should update it also. +type TaskConfigForDowngrade struct { + Name string `yaml:"name"` + TaskMode string `yaml:"task-mode"` + IsSharding bool `yaml:"is-sharding"` + ShardMode string `yaml:"shard-mode"` + IgnoreCheckingItems []string `yaml:"ignore-checking-items"` + MetaSchema string `yaml:"meta-schema"` + EnableHeartbeat bool `yaml:"enable-heartbeat"` + HeartbeatUpdateInterval int `yaml:"heartbeat-update-interval"` + HeartbeatReportInterval int `yaml:"heartbeat-report-interval"` + Timezone string `yaml:"timezone"` + CaseSensitive bool `yaml:"case-sensitive"` + TargetDB *DBConfig `yaml:"target-database"` + OnlineDDLScheme string `yaml:"online-ddl-scheme"` + Routes map[string]*router.TableRule `yaml:"routes"` + Filters map[string]*bf.BinlogEventRule `yaml:"filters"` + ColumnMappings map[string]*column.Rule `yaml:"column-mappings"` + BWList map[string]*filter.Rules `yaml:"black-white-list"` + BAList map[string]*filter.Rules `yaml:"block-allow-list"` + Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"` + Loaders map[string]*LoaderConfig `yaml:"loaders"` + Syncers map[string]*SyncerConfig `yaml:"syncers"` + CleanDumpFile bool `yaml:"clean-dump-file"` + EnableANSIQuotes bool `yaml:"ansi-quotes"` + RemoveMeta bool `yaml:"remove-meta"` + // new config item + MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"` + ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"` + OnlineDDL bool `yaml:"online-ddl,omitempty"` +} + +// NewTaskConfigForDowngrade create new TaskConfigForDowngrade. +func NewTaskConfigForDowngrade(taskConfig *TaskConfig) *TaskConfigForDowngrade { + return &TaskConfigForDowngrade{ + Name: taskConfig.Name, + TaskMode: taskConfig.TaskMode, + IsSharding: taskConfig.IsSharding, + ShardMode: taskConfig.ShardMode, + IgnoreCheckingItems: taskConfig.IgnoreCheckingItems, + MetaSchema: taskConfig.MetaSchema, + EnableHeartbeat: taskConfig.EnableHeartbeat, + HeartbeatUpdateInterval: taskConfig.HeartbeatUpdateInterval, + HeartbeatReportInterval: taskConfig.HeartbeatReportInterval, + Timezone: taskConfig.Timezone, + CaseSensitive: taskConfig.CaseSensitive, + TargetDB: taskConfig.TargetDB, + OnlineDDLScheme: taskConfig.OnlineDDLScheme, + Routes: taskConfig.Routes, + Filters: taskConfig.Filters, + ColumnMappings: taskConfig.ColumnMappings, + BWList: taskConfig.BWList, + BAList: taskConfig.BAList, + Mydumpers: taskConfig.Mydumpers, + Loaders: taskConfig.Loaders, + Syncers: taskConfig.Syncers, + CleanDumpFile: taskConfig.CleanDumpFile, + EnableANSIQuotes: taskConfig.EnableANSIQuotes, + RemoveMeta: taskConfig.RemoveMeta, + MySQLInstances: NewMySQLInstancesForDowngrade(taskConfig.MySQLInstances), + ExprFilter: taskConfig.ExprFilter, + OnlineDDL: taskConfig.OnlineDDL, + } +} + +// omitDefaultVals change default value to empty value for new config item. +// If any default value for new config item is not empty(0 or false or nil), +// we should change it to empty. +func (c *TaskConfigForDowngrade) omitDefaultVals() { + if len(c.TargetDB.Session) > 0 { + if timeZone, ok := c.TargetDB.Session["time_zone"]; ok && timeZone == defaultTimeZone { + delete(c.TargetDB.Session, "time_zone") + } + } +} + +// Yaml returns YAML format representation of config. +func (c *TaskConfigForDowngrade) Yaml() (string, error) { + b, err := yaml.Marshal(c) + return string(b), err +} diff --git a/dm/config/task_test.go b/dm/config/task_test.go index dd817bb3a6..a1352dc957 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -16,6 +16,7 @@ package config import ( "io/ioutil" "path" + "reflect" "sort" "strings" @@ -29,8 +30,7 @@ import ( "github.com/coreos/go-semver/semver" ) -func (t *testConfig) TestUnusedTaskConfig(c *C) { - correctTaskConfig := `--- +var correctTaskConfig = `--- name: test task-mode: all shard-mode: "pessimistic" @@ -135,6 +135,8 @@ mysql-instances: loader-config-name: "global2" syncer-config-name: "global2" ` + +func (t *testConfig) TestUnusedTaskConfig(c *C) { taskConfig := NewTaskConfig() err := taskConfig.Decode(correctTaskConfig) c.Assert(err, IsNil) @@ -1011,3 +1013,78 @@ func (t *testConfig) TestExclusiveAndWrongExprFilterFields(c *C) { err = cfg.adjust() c.Assert(terror.ErrConfigExprFilterWrongGrammar.Equal(err), IsTrue) } + +func (t *testConfig) TestTaskConfigForDowngrade(c *C) { + cfg := NewTaskConfig() + err := cfg.Decode(correctTaskConfig) + c.Assert(err, IsNil) + + cfgForDowngrade := NewTaskConfigForDowngrade(cfg) + + // make sure all new field were added + cfgReflect := reflect.Indirect(reflect.ValueOf(cfg)) + cfgForDowngradeReflect := reflect.Indirect(reflect.ValueOf(cfgForDowngrade)) + c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+1) // without flag + + // make sure all field were copied + cfgForClone := &TaskConfigForDowngrade{} + Clone(cfgForClone, cfg) + c.Assert(cfgForDowngrade, DeepEquals, cfgForClone) +} + +// Clone clones src to dest. +func Clone(dest, src interface{}) { + cloneValues(reflect.ValueOf(dest), reflect.ValueOf(src)) +} + +// cloneValues clone src to dest recursively. +// Note: pointer still use shallow copy. +func cloneValues(dest, src reflect.Value) { + destType := dest.Type() + srcType := src.Type() + if destType.Kind() == reflect.Ptr { + destType = destType.Elem() + } + if srcType.Kind() == reflect.Ptr { + srcType = srcType.Elem() + } + + if destType.Kind() == reflect.Slice { + slice := reflect.MakeSlice(destType, src.Len(), src.Cap()) + for i := 0; i < src.Len(); i++ { + if slice.Index(i).Type().Kind() == reflect.Ptr { + newVal := reflect.New(slice.Index(i).Type().Elem()) + cloneValues(newVal, src.Index(i)) + slice.Index(i).Set(newVal) + } else { + cloneValues(slice.Index(i).Addr(), src.Index(i).Addr()) + } + } + dest.Set(slice) + return + } + + destFieldsMap := map[string]int{} + for i := 0; i < destType.NumField(); i++ { + destFieldsMap[destType.Field(i).Name] = i + } + for i := 0; i < srcType.NumField(); i++ { + if j, ok := destFieldsMap[srcType.Field(i).Name]; ok { + destField := dest.Elem().Field(j) + srcField := src.Elem().Field(i) + destFieldType := destField.Type() + srcFieldType := srcField.Type() + if destFieldType.Kind() == reflect.Ptr { + destFieldType = destFieldType.Elem() + } + if srcFieldType.Kind() == reflect.Ptr { + srcFieldType = srcFieldType.Elem() + } + if destFieldType != srcFieldType { + cloneValues(destField, srcField) + } else { + destField.Set(srcField) + } + } + } +} diff --git a/dm/ctl/common/util.go b/dm/ctl/common/util.go index 25106a34ea..0fa4522018 100644 --- a/dm/ctl/common/util.go +++ b/dm/ctl/common/util.go @@ -58,9 +58,9 @@ var ( type CtlClient struct { mu sync.RWMutex tls *toolutils.TLS - etcdClient *clientv3.Client conn *grpc.ClientConn - MasterClient pb.MasterClient // exposed to be used in test + MasterClient pb.MasterClient // exposed to be used in test + EtcdClient *clientv3.Client // exposed to be used in export config } func (c *CtlClient) updateMasterClient() error { @@ -76,7 +76,7 @@ func (c *CtlClient) updateMasterClient() error { c.conn.Close() } - endpoints := c.etcdClient.Endpoints() + endpoints := c.EtcdClient.Endpoints() for _, endpoint := range endpoints { //nolint:staticcheck conn, err = grpc.Dial(utils.UnwrapScheme(endpoint), c.tls.ToGRPCDialOption(), grpc.WithBackoffMaxDelay(3*time.Second), grpc.WithBlock(), grpc.WithTimeout(3*time.Second)) @@ -176,7 +176,7 @@ func InitClient(addr string, securityCfg config.Security) error { GlobalCtlClient = &CtlClient{ tls: tls, - etcdClient: etcdClient, + EtcdClient: etcdClient, } return GlobalCtlClient.updateMasterClient() @@ -355,7 +355,7 @@ func SyncMasterEndpoints(ctx context.Context) { clientURLs := []string{} updateF := func() { clientURLs = clientURLs[:0] - resp, err := GlobalCtlClient.etcdClient.MemberList(ctx) + resp, err := GlobalCtlClient.EtcdClient.MemberList(ctx) if err != nil { return } @@ -366,7 +366,7 @@ func SyncMasterEndpoints(ctx context.Context) { if utils.NonRepeatStringsEqual(clientURLs, lastClientUrls) { return } - GlobalCtlClient.etcdClient.SetEndpoints(clientURLs...) + GlobalCtlClient.EtcdClient.SetEndpoints(clientURLs...) lastClientUrls = make([]string, len(clientURLs)) copy(lastClientUrls, clientURLs) } diff --git a/dm/ctl/ctl.go b/dm/ctl/ctl.go index 70d1474af9..0f94229335 100644 --- a/dm/ctl/ctl.go +++ b/dm/ctl/ctl.go @@ -75,6 +75,7 @@ func NewRootCmd() *cobra.Command { master.NewTransferSourceCmd(), master.NewStartRelayCmd(), master.NewStopRelayCmd(), + master.NewConfigCmd(), newDecryptCmd(), newEncryptCmd(), ) diff --git a/dm/ctl/master/config.go b/dm/ctl/master/config.go new file mode 100644 index 0000000000..5a9931567c --- /dev/null +++ b/dm/ctl/master/config.go @@ -0,0 +1,387 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package master + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "path" + "sort" + "strings" + + "github.com/spf13/cobra" + "go.etcd.io/etcd/clientv3" + + "github.com/pingcap/errors" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/dm/ctl/common" + "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/ha" + "github.com/pingcap/dm/pkg/utils" +) + +var ( + taskDirname = "tasks" + sourceDirname = "sources" + relayWorkersFilename = "relay_workers.json" + yamlSuffix = ".yaml" +) + +// NewConfigCmd creates a exportCfg command. +func NewConfigCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "config", + Short: "Commands to import/export config", + } + cmd.AddCommand(newExportCfgsCmd()) + cmd.AddCommand(newImportCfgsCmd()) + return cmd +} + +// newExportCfgsCmd creates a exportCfg command. +func newExportCfgsCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "export [--dir directory]", + Short: "Export the configurations of sources and tasks.", + RunE: exportCfgsFunc, + } + cmd.Flags().StringP("dir", "d", "configs", "specify the output directory, default is `./configs`") + return cmd +} + +// newImportCfgsCmd creates a importCfg command. +func newImportCfgsCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "import [--dir directory]", + Short: "Import the configurations of sources and tasks.", + RunE: importCfgsFunc, + } + cmd.Flags().StringP("dir", "d", "configs", "specify the configs directory, default is `./configs`") + return cmd +} + +// exportCfgsFunc exports configs. +func exportCfgsFunc(cmd *cobra.Command, args []string) error { + dir, err := cmd.Flags().GetString("dir") + if err != nil { + common.PrintLinesf("can not get directory") + return err + } + + // get all configs + sourceCfgsMap, subTaskCfgsMap, relayWorkersSet, err := getAllCfgs(common.GlobalCtlClient.EtcdClient) + if err != nil { + return err + } + // create directory + taskDir, sourceDir, err := createDirectory(dir) + if err != nil { + return err + } + // write sourceCfg files + if err = writeSourceCfgs(sourceDir, sourceCfgsMap); err != nil { + return err + } + // write taskCfg files + if err = writeTaskCfgs(taskDir, subTaskCfgsMap); err != nil { + return err + } + // write relayWorkers + if err = writeRelayWorkers(path.Join(dir, relayWorkersFilename), relayWorkersSet); err != nil { + return err + } + + common.PrintLinesf("export configs to directory `%s` succeed", dir) + return nil +} + +// importCfgsFunc imports configs. +func importCfgsFunc(cmd *cobra.Command, args []string) error { + dir, err := cmd.Flags().GetString("dir") + if err != nil { + common.PrintLinesf("can not get directory") + return err + } + + sourceCfgs, taskCfgs, relayWorkers, err := collectCfgs(dir) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if err := createSources(ctx, sourceCfgs); err != nil { + return err + } + if err := createTasks(ctx, taskCfgs); err != nil { + return err + } + if len(relayWorkers) > 0 { + common.PrintLinesf("The original relay workers have been exported to `%s`.", path.Join(dir, relayWorkersFilename)) + common.PrintLinesf("Currently DM doesn't support recover relay workers. You may need to execute `transfer-source` and `start-relay` command manually.") + } + + common.PrintLinesf("import configs from directory `%s` succeed", dir) + return nil +} + +func collectDirCfgs(dir string) ([]string, error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + + cfgs := make([]string, 0, len(files)) + for _, f := range files { + cfg, err2 := common.GetFileContent(path.Join(dir, f.Name())) + if err2 != nil { + return nil, err2 + } + cfgs = append(cfgs, string(cfg)) + } + return cfgs, nil +} + +// getSourceCfgs gets all source cfgs. +func getSourceCfgs(cli *clientv3.Client) (map[string]*config.SourceConfig, error) { + sourceCfgsMap, _, err := ha.GetSourceCfg(cli, "", 0) + if err != nil { + return nil, err + } + // try to get all source cfgs before v2.0.2 + if len(sourceCfgsMap) == 0 { + sourceCfgsMap, _, err = ha.GetAllSourceCfgBeforeV202(cli) + if err != nil { + return nil, err + } + } + return sourceCfgsMap, nil +} + +func getAllCfgs(cli *clientv3.Client) (map[string]*config.SourceConfig, map[string]map[string]config.SubTaskConfig, map[string]map[string]struct{}, error) { + // get all source cfgs + sourceCfgsMap, err := getSourceCfgs(cli) + if err != nil { + common.PrintLinesf("can not get source configs from etcd") + return nil, nil, nil, err + } + // get all task cfgs + subTaskCfgsMap, _, err := ha.GetAllSubTaskCfg(cli) + if err != nil { + common.PrintLinesf("can not get subtask configs from etcd") + return nil, nil, nil, err + } + // get all relay configs. + relayWorkers, _, err := ha.GetAllRelayConfig(cli) + if err != nil { + common.PrintLinesf("can not get relay workers from etcd") + return nil, nil, nil, err + } + return sourceCfgsMap, subTaskCfgsMap, relayWorkers, nil +} + +func createDirectory(dir string) (string, string, error) { + taskDir := path.Join(dir, taskDirname) + if err := os.MkdirAll(taskDir, 0o755); err != nil { + common.PrintLinesf("can not create directory of task configs `%s`", taskDir) + return "", "", err + } + sourceDir := path.Join(dir, sourceDirname) + if err := os.MkdirAll(sourceDir, 0o755); err != nil { + common.PrintLinesf("can not create directory of source configs `%s`", sourceDir) + return "", "", err + } + return taskDir, sourceDir, nil +} + +func writeSourceCfgs(sourceDir string, sourceCfgsMap map[string]*config.SourceConfig) error { + for source, sourceCfg := range sourceCfgsMap { + sourceFile := path.Join(sourceDir, source) + sourceFile += yamlSuffix + fileContent, err := sourceCfg.YamlForDowngrade() + if err != nil { + common.PrintLinesf("fail to marshal source config of `%s`", source) + return err + } + err = ioutil.WriteFile(sourceFile, []byte(fileContent), 0o644) + if err != nil { + common.PrintLinesf("fail to write source config to file `%s`", sourceFile) + return err + } + } + return nil +} + +func writeTaskCfgs(taskDir string, subTaskCfgsMap map[string]map[string]config.SubTaskConfig) error { + subTaskCfgsListMap := make(map[string][]*config.SubTaskConfig, len(subTaskCfgsMap)) + // from source => task => subtask to task => subtask + for _, subTaskCfgs := range subTaskCfgsMap { + for task, subTaskCfg := range subTaskCfgs { + clone := subTaskCfg + subTaskCfgsListMap[task] = append(subTaskCfgsListMap[task], &clone) + } + } + // from task => subtask to task => taskCfg + for task, subTaskCfgs := range subTaskCfgsListMap { + sort.Slice(subTaskCfgs, func(i, j int) bool { + return subTaskCfgs[i].SourceID < subTaskCfgs[j].SourceID + }) + taskCfg := config.FromSubTaskConfigs(subTaskCfgs...) + + taskFile := path.Join(taskDir, task) + taskFile += yamlSuffix + taskContent, err := taskCfg.YamlForDowngrade() + if err != nil { + common.PrintLinesf("fail to marshal source config of `%s`", task) + } + if err := ioutil.WriteFile(taskFile, []byte(taskContent), 0o644); err != nil { + common.PrintLinesf("can not write task config to file `%s`", taskFile) + return err + } + } + return nil +} + +func writeRelayWorkers(relayWorkersFile string, relayWorkersSet map[string]map[string]struct{}) error { + if len(relayWorkersSet) == 0 { + return nil + } + + // from source => workerSet to source => workerList + relayWorkers := make(map[string][]string, len(relayWorkersSet)) + for source, workerSet := range relayWorkersSet { + workers := make([]string, 0, len(workerSet)) + for worker := range workerSet { + workers = append(workers, worker) + } + sort.Strings(workers) + relayWorkers[source] = workers + } + + content, err := json.Marshal(relayWorkers) + if err != nil { + common.PrintLinesf("fail to marshal relay workers") + return err + } + + err = ioutil.WriteFile(relayWorkersFile, content, 0o644) + if err != nil { + common.PrintLinesf("can not write relay workers to file `%s`", relayWorkersFile) + return err + } + return nil +} + +func collectCfgs(dir string) (sourceCfgs []string, taskCfgs []string, relayWorkers map[string][]string, err error) { + var ( + sourceDir = path.Join(dir, sourceDirname) + taskDir = path.Join(dir, taskDirname) + relayWorkersFile = path.Join(dir, relayWorkersFilename) + content []byte + ) + if !utils.IsDirExists(dir) { + return nil, nil, nil, errors.Errorf("config directory `%s` not exists", dir) + } + + if utils.IsDirExists(sourceDir) { + if sourceCfgs, err = collectDirCfgs(sourceDir); err != nil { + common.PrintLinesf("fail to collect source config files from source configs directory `%s`", sourceDir) + return + } + } + if utils.IsDirExists(taskDir) { + if taskCfgs, err = collectDirCfgs(taskDir); err != nil { + common.PrintLinesf("fail to collect task config files from task configs directory `%s`", taskDir) + return + } + } + if utils.IsFileExists(relayWorkersFile) { + content, err = common.GetFileContent(relayWorkersFile) + if err != nil { + common.PrintLinesf("fail to read relay workers config `%s`", relayWorkersFile) + return + } + err = json.Unmarshal(content, &relayWorkers) + if err != nil { + common.PrintLinesf("fail to unmarshal relay workers config `%s`", relayWorkersFile) + return + } + } + // nolint:nakedret + return +} + +func createSources(ctx context.Context, sourceCfgs []string) error { + if len(sourceCfgs) == 0 { + return nil + } + common.PrintLinesf("start creating sources") + + sourceResp := &pb.OperateSourceResponse{} + // Do not use batch for `operate-source start source1, source2` if we want to support idemponent config import. + // Because `operate-source start` will revert all batch sources if any source error. + // e.g. ErrSchedulerSourceCfgExist + for _, sourceCfg := range sourceCfgs { + err := common.SendRequest( + ctx, + "OperateSource", + &pb.OperateSourceRequest{ + Config: []string{sourceCfg}, + Op: pb.SourceOp_StartSource, + }, + &sourceResp, + ) + if err != nil { + common.PrintLinesf("fail to create sources") + return err + } + + if !sourceResp.Result && !strings.Contains(sourceResp.Msg, "already exist") { + common.PrettyPrintResponse(sourceResp) + return errors.Errorf("fail to create sources") + } + } + return nil +} + +func createTasks(ctx context.Context, taskCfgs []string) error { + if len(taskCfgs) == 0 { + return nil + } + common.PrintLinesf("start creating tasks") + + taskResp := &pb.StartTaskResponse{} + for _, taskCfg := range taskCfgs { + err := common.SendRequest( + ctx, + "StartTask", + &pb.StartTaskRequest{ + Task: taskCfg, + }, + &taskResp, + ) + if err != nil { + common.PrintLinesf("fail to create tasks") + return err + } + if !taskResp.Result && !strings.Contains(taskResp.Msg, "already exist") { + common.PrettyPrintResponse(taskResp) + return errors.Errorf("fail to create tasks") + } + } + return nil +} diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index 41bc926684..698427eda4 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -381,6 +381,18 @@ func (s *Scheduler) RemoveSourceCfg(source string) error { return nil } +// GetSourceCfgs gets all source cfgs, return nil when error happens. +func (s *Scheduler) GetSourceCfgs() map[string]*config.SourceConfig { + s.mu.RLock() + defer s.mu.RUnlock() + clone := make(map[string]*config.SourceConfig, len(s.sourceCfgs)) + for sourceID, sourceCfg := range s.sourceCfgs { + cloneCfg := sourceCfg.Clone() + clone[sourceID] = cloneCfg + } + return clone +} + // GetSourceCfgIDs gets all added source ID. func (s *Scheduler) GetSourceCfgIDs() []string { s.mu.RLock() diff --git a/pkg/ha/source.go b/pkg/ha/source.go index 66f6159c1c..89f31c2b5a 100644 --- a/pkg/ha/source.go +++ b/pkg/ha/source.go @@ -37,6 +37,31 @@ func PutSourceCfg(cli *clientv3.Client, cfg *config.SourceConfig) (int64, error) return rev, err } +// GetAllSourceCfgBeforeV202 gets all upstream source configs before v2.0.2. +// This func only use for config export command. +func GetAllSourceCfgBeforeV202(cli *clientv3.Client) (map[string]*config.SourceConfig, int64, error) { + ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout) + defer cancel() + + var ( + scm = make(map[string]*config.SourceConfig) + resp *clientv3.GetResponse + err error + ) + resp, err = cli.Get(ctx, common.UpstreamConfigKeyAdapterV1.Path(), clientv3.WithPrefix()) + + if err != nil { + return scm, 0, err + } + + scm, err = sourceCfgFromResp("", resp) + if err != nil { + return scm, 0, err + } + + return scm, resp.Header.Revision, nil +} + // GetSourceCfg gets upstream source configs. // k/v: source ID -> source config. // if the source config for the sourceID not exist, return with `err == nil`. diff --git a/tests/dmctl_command/run.sh b/tests/dmctl_command/run.sh index 932fad8d0e..0b08a1bb20 100644 --- a/tests/dmctl_command/run.sh +++ b/tests/dmctl_command/run.sh @@ -6,7 +6,7 @@ cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME -help_cnt=46 +help_cnt=47 function run() { # check dmctl output with help flag diff --git a/tests/new_relay/conf/source1.yaml b/tests/new_relay/conf/source1.yaml index 406e7ae254..96e7c670da 100644 --- a/tests/new_relay/conf/source1.yaml +++ b/tests/new_relay/conf/source1.yaml @@ -1,4 +1,5 @@ source-id: mysql-replica-01 +server-id: 123456 flavor: 'mysql' enable-gtid: true relay-binlog-name: '' diff --git a/tests/new_relay/configs/relay_workers.json b/tests/new_relay/configs/relay_workers.json new file mode 100644 index 0000000000..ae362cafea --- /dev/null +++ b/tests/new_relay/configs/relay_workers.json @@ -0,0 +1 @@ +{"mysql-replica-01":["worker1","worker2"]} \ No newline at end of file diff --git a/tests/new_relay/configs/sources/mysql-replica-01.yaml b/tests/new_relay/configs/sources/mysql-replica-01.yaml new file mode 100644 index 0000000000..c33a2116cf --- /dev/null +++ b/tests/new_relay/configs/sources/mysql-replica-01.yaml @@ -0,0 +1,31 @@ +enable-gtid: true +auto-fix-gtid: false +relay-dir: relay-dir +meta-dir: "" +flavor: mysql +charset: "" +enable-relay: true +relay-binlog-name: "" +relay-binlog-gtid: "" +source-id: mysql-replica-01 +from: + host: 127.0.0.1 + port: 3306 + user: root + max-allowed-packet: null + session: {} + security: null +purge: + interval: 3600 + expires: 0 + remain-space: 15 +checker: + check-enable: false + backoff-rollback: 5m0s + backoff-max: 5m0s + check-interval: 5s + backoff-min: 1s + backoff-jitter: true + backoff-factor: 2 +server-id: 123456 +tracer: {} diff --git a/tests/new_relay/configs/tasks/test.yaml b/tests/new_relay/configs/tasks/test.yaml new file mode 100644 index 0000000000..17b0698cfc --- /dev/null +++ b/tests/new_relay/configs/tasks/test.yaml @@ -0,0 +1,78 @@ +name: test +task-mode: all +is-sharding: false +shard-mode: "" +ignore-checking-items: [] +meta-schema: dm_meta +enable-heartbeat: false +heartbeat-update-interval: 1 +heartbeat-report-interval: 1 +timezone: "" +case-sensitive: false +target-database: + host: 127.0.0.1 + port: 4000 + user: root + max-allowed-packet: null + session: + tidb_txn_mode: optimistic + security: null +online-ddl-scheme: "" +routes: {} +filters: {} +column-mappings: {} +black-white-list: {} +block-allow-list: + balist-01: + do-tables: [] + do-dbs: + - new_relay + ignore-tables: [] + ignore-dbs: [] +mydumpers: + dump-01: + mydumper-path: ./bin/mydumper + threads: 4 + chunk-filesize: "64" + statement-size: 0 + rows: 0 + where: "" + skip-tz-utc: true + extra-args: "" +loaders: + load-01: + pool-size: 16 + dir: ./dumped_data.test +syncers: + sync-01: + meta-file: "" + worker-count: 16 + batch: 100 + queue-size: 1024 + checkpoint-flush-interval: 30 + max-retry: 0 + auto-fix-gtid: false + enable-gtid: false + disable-detect: false + safe-mode: false + enable-ansi-quotes: false +clean-dump-file: false +ansi-quotes: false +remove-meta: false +mysql-instances: +- source-id: mysql-replica-01 + meta: null + filter-rules: [] + column-mapping-rules: [] + route-rules: [] + black-white-list: "" + block-allow-list: balist-01 + mydumper-config-name: dump-01 + mydumper: null + mydumper-thread: 0 + loader-config-name: load-01 + loader: null + loader-thread: 0 + syncer-config-name: sync-01 + syncer: null + syncer-thread: 0 diff --git a/tests/new_relay/data/db1.increment5.sql b/tests/new_relay/data/db1.increment5.sql new file mode 100644 index 0000000000..6cffd32080 --- /dev/null +++ b/tests/new_relay/data/db1.increment5.sql @@ -0,0 +1,2 @@ +alter table new_relay.t1 add column new_col1 int, add column new_col2 int; +insert into new_relay.t1 (id, name, info, new_col1, new_col2) values (13, 'gentest', '{"id": 133}', 13, 13), (14, 'gentest', '{"id": 134}', 14, 14); diff --git a/tests/new_relay/run.sh b/tests/new_relay/run.sh index 927be18385..16d2e7331b 100755 --- a/tests/new_relay/run.sh +++ b/tests/new_relay/run.sh @@ -116,6 +116,49 @@ function run() { run_sql_file $cur/data/db1.increment4.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # config export + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "config export -d /tmp/configs" \ + "export configs to directory .* succeed" 1 + + # check configs + sed '/password/d' /tmp/configs/tasks/test.yaml | diff $cur/configs/tasks/test.yaml - || exit 1 + sed '/password/d' /tmp/configs/sources/mysql-replica-01.yaml | diff $cur/configs/sources/mysql-replica-01.yaml - || exit 1 + diff <(jq --sort-keys . /tmp/configs/relay_workers.json) <(jq --sort-keys . $cur/configs/relay_workers.json) || exit 1 + + # destroy cluster + cleanup_process $* + rm -rf $WORK_DIR + mkdir $WORK_DIR + + # insert new data + run_sql_file $cur/data/db1.increment5.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + + # deploy new cluster + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # import configs + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "config import -d /tmp/configs" \ + "creating sources" 1 \ + "creating tasks" 1 \ + "The original relay workers have been exported to" 1 \ + "Currently DM doesn't support recover relay workers.*transfer-source.*start-relay" 1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "operate-source show" \ + "mysql-replica-01" 1 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "\"result\": true" 2 + + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml } cleanup_data $TEST_NAME diff --git a/tests/tiup/conf/source1.yaml b/tests/tiup/conf/source1.yaml index c49cc78ae2..484bbdb662 100644 --- a/tests/tiup/conf/source1.yaml +++ b/tests/tiup/conf/source1.yaml @@ -1,9 +1,10 @@ source-id: mysql-replica-01 enable-gtid: false enable-relay: false +server-id: 123456 from: host: mysql1 user: root port: 3306 checker: - check-enable: false + check-enable: true diff --git a/tests/tiup/conf/source2.yaml b/tests/tiup/conf/source2.yaml index 30903199f5..bcbe1bd306 100644 --- a/tests/tiup/conf/source2.yaml +++ b/tests/tiup/conf/source2.yaml @@ -1,9 +1,10 @@ source-id: mariadb-replica-02 enable-gtid: true enable-relay: true +server-id: 654321 from: host: mariadb2 user: root port: 3306 checker: - check-enable: false + check-enable: true diff --git a/tests/tiup/lib.sh b/tests/tiup/lib.sh index 1f5fca6d17..85736ac023 100755 --- a/tests/tiup/lib.sh +++ b/tests/tiup/lib.sh @@ -172,11 +172,52 @@ function exec_incremental_stage2() { exec_sql mariadb2 3306 "INSERT INTO $DB6.$TBL3 (c1, c2, c3) VALUES (214, '214', 214);" } +function exec_incremental_stage3() { + # prepare incremental data + exec_sql mysql1 3306 "INSERT INTO $DB1.$TBL1 (c1, c2) VALUES (301, '301');" + exec_sql mysql1 3306 "INSERT INTO $DB1.$TBL2 (c1, c2) VALUES (302, '302');" + exec_sql mariadb2 3306 "INSERT INTO $DB2.$TBL2 (c1, c2) VALUES (311, '311');" + exec_sql mariadb2 3306 "INSERT INTO $DB2.$TBL3 (c1, c2) VALUES (312, '312');" + + # prepare optimistic incremental data + exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL_LOWER1 (c1, c2, c3, c4) VALUES (301, '301', 301, 301);" + exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL2 (c1, c2, c3, c4) VALUES (302, '302', 302, 302);" + exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL2 (c1, c2, c3, c4) VALUES (311, '311', 311, 311);" + exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL_LOWER3 (c1, c2, c3, c4) VALUES (312, '312', 312, 312);" + + # prepare pessimistic incremental data + exec_sql mysql1 3306 "INSERT INTO $DB5.$TBL1 (c1, c2, c3) VALUES (303, '303', 303);" + exec_sql mysql1 3306 "INSERT INTO $DB5.$TBL2 (c1, c2, c3) VALUES (304, '304', 304);" + exec_sql mariadb2 3306 "INSERT INTO $DB6.$TBL2 (c1, c2, c3) VALUES (313, '313', 313);" + exec_sql mariadb2 3306 "INSERT INTO $DB6.$TBL3 (c1, c2, c3) VALUES (314, '314', 314);" +} + +function exec_incremental_stage4() { + # prepare incremental data + exec_sql mysql1 3306 "INSERT INTO $DB1.$TBL1 (c1, c2) VALUES (401, '401');" + exec_sql mysql1 3306 "INSERT INTO $DB1.$TBL2 (c1, c2) VALUES (402, '402');" + exec_sql mariadb2 3306 "INSERT INTO $DB2.$TBL2 (c1, c2) VALUES (411, '411');" + exec_sql mariadb2 3306 "INSERT INTO $DB2.$TBL3 (c1, c2) VALUES (412, '412');" + + # prepare optimistic incremental data + exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL_LOWER1 (c1, c2, c3, c4) VALUES (401, '401', 401, 401);" + exec_sql mysql1 3306 "INSERT INTO $DB3.$TBL2 (c1, c2, c3, c4) VALUES (402, '402', 402, 402);" + exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL2 (c1, c2, c3, c4) VALUES (411, '411', 411, 411);" + exec_sql mariadb2 3306 "INSERT INTO $DB4.$TBL_LOWER3 (c1, c2, c3, c4) VALUES (412, '412', 412, 412);" + + # prepare pessimistic incremental data + exec_sql mysql1 3306 "INSERT INTO $DB5.$TBL1 (c1, c2, c3) VALUES (403, '403', 403);" + exec_sql mysql1 3306 "INSERT INTO $DB5.$TBL2 (c1, c2, c3) VALUES (404, '404', 404);" + exec_sql mariadb2 3306 "INSERT INTO $DB6.$TBL2 (c1, c2, c3) VALUES (413, '413', 413);" + exec_sql mariadb2 3306 "INSERT INTO $DB6.$TBL3 (c1, c2, c3) VALUES (414, '414', 414);" +} + function patch_nightly_with_tiup_mirror() { # clone packages for upgrade. # FIXME: use nightly version of grafana and prometheus after https://github.com/pingcap/tiup/issues/1334 fixed. tiup mirror clone tidb-dm-nightly-linux-amd64 --os=linux --arch=amd64 \ --alertmanager=v0.17.0 --grafana=v5.0.1 --prometheus=v5.0.1 \ + --dm-master=$1 --dm-worker=$1 \ --tiup=v$(tiup --version | grep 'tiup' | awk -F ' ' '{print $1}') --dm=v$(tiup --version | grep 'tiup' | awk -F ' ' '{print $1}') # change tiup mirror @@ -186,8 +227,9 @@ function patch_nightly_with_tiup_mirror() { # binary files have already been built and packaged. tiup mirror genkey tiup mirror grant gmhdbjd --name gmhdbjd - tiup mirror publish dm-master nightly /tmp/dm-master-nightly-linux-amd64.tar.gz dm-master/dm-master --arch amd64 --os linux --desc="dm-master component of Data Migration Platform" - tiup mirror publish dm-worker nightly /tmp/dm-worker-nightly-linux-amd64.tar.gz dm-worker/dm-worker --arch amd64 --os linux --desc="dm-worker component of Data Migration Platform" + mv tidb-dm-nightly-linux-amd64/keys/*-pingcap.json ./pingcap.json + tiup mirror publish dm-master nightly /tmp/dm-master-nightly-linux-amd64.tar.gz dm-master/dm-master --arch amd64 --os linux --desc="dm-master component of Data Migration Platform" -k pingcap.json + tiup mirror publish dm-worker nightly /tmp/dm-worker-nightly-linux-amd64.tar.gz dm-worker/dm-worker --arch amd64 --os linux --desc="dm-worker component of Data Migration Platform" -k pingcap.json tiup mirror publish dmctl nightly /tmp/dmctl-nightly-linux-amd64.tar.gz dmctl/dmctl --arch amd64 --os linux --desc="dmctl component of Data Migration Platform" tiup list diff --git a/tests/tiup/upgrade-from-v2.sh b/tests/tiup/upgrade-from-v2.sh index b0e595d8c8..99b4a426c3 100755 --- a/tests/tiup/upgrade-from-v2.sh +++ b/tests/tiup/upgrade-from-v2.sh @@ -33,6 +33,10 @@ function migrate_in_previous_v2() { sed -i "s/enable-gtid: true/enable-gtid: false/g" $CUR/conf/source2.yaml fi + sed -i "s/enable-heartbeat: true/enable-heartbeat: false/g" $CUR/conf/task.yaml + sed -i "s/enable-heartbeat: true/enable-heartbeat: false/g" $CUR/conf/task_pessimistic.yaml + sed -i "s/enable-heartbeat: true/enable-heartbeat: false/g" $CUR/conf/task_optimistic.yaml + tiup dmctl:$PRE_VER --master-addr=master1:8261 operate-source create $CUR/conf/source1.yaml tiup dmctl:$PRE_VER --master-addr=master1:8261 operate-source create $CUR/conf/source2.yaml @@ -51,12 +55,17 @@ function migrate_in_previous_v2() { function upgrade_to_current_v2() { if [[ "$CUR_VER" == "nightly" && "$ref" == "refs/pull"* ]]; then - patch_nightly_with_tiup_mirror + patch_nightly_with_tiup_mirror $PRE_VER fi - tiup dm upgrade --yes $CLUSTER_NAME $CUR_VER + # uninstall previous dmctl, otherwise dmctl:nightly still use PRE_VER. # FIXME: It may be a bug in tiup mirror. tiup uninstall dmctl --all + + # config export in PRE_VER + tiup dmctl:$CUR_VER --master-addr=master1:8261 config export -d old_configs + + tiup dm upgrade --yes $CLUSTER_NAME $CUR_VER } function migrate_in_v2() { @@ -81,11 +90,60 @@ function migrate_in_v2() { echo "check locks" run_dmctl_with_retry $CUR_VER "show-ddl-locks" "no DDL lock exists" 1 - export DM_MASTER_ADDR="master1:8261" - tiup dmctl:$CUR_VER stop-task $TASK_NAME + # config export in CUR_VER + tiup dmctl:$CUR_VER --master-addr=master1:8261 config export -d new_configs +} + +function diff_configs() { + echo "diff configs between different version" + + sed '/password/d' old_configs/tasks/upgrade_via_tiup.yaml >/tmp/old_task.yaml + sed '/password/d' old_configs/tasks/upgrade_via_tiup_pessimistic.yaml >/tmp/old_task_pessimistic.yaml + sed '/password/d' old_configs/tasks/upgrade_via_tiup_optimistic.yaml >/tmp/old_task_optimistic.yaml + sed '/password/d' new_configs/tasks/upgrade_via_tiup.yaml >/tmp/new_task.yaml + sed '/password/d' new_configs/tasks/upgrade_via_tiup_pessimistic.yaml >/tmp/new_task_pessimistic.yaml + sed '/password/d' new_configs/tasks/upgrade_via_tiup_optimistic.yaml >/tmp/new_task_optimistic.yaml + + sed '/password/d' old_configs/sources/mysql-replica-01.yaml >/tmp/old_source1.yaml + sed '/password/d' old_configs/sources/mariadb-replica-02.yaml >/tmp/old_source2.yaml + sed '/password/d' new_configs/sources/mysql-replica-01.yaml >/tmp/new_source1.yaml + sed '/password/d' new_configs/sources/mariadb-replica-02.yaml >/tmp/new_source2.yaml + + diff /tmp/old_task.yaml /tmp/new_task.yaml || exit 1 + diff /tmp/old_task_pessimistic.yaml /tmp/new_task_pessimistic.yaml || exit 1 + diff /tmp/old_task_optimistic.yaml /tmp/new_task_optimistic.yaml || exit 1 + diff /tmp/old_source1.yaml /tmp/new_source1.yaml || exit 1 + diff /tmp/old_source2.yaml /tmp/new_source2.yaml || exit 1 +} + +function downgrade_to_previous_v2() { + echo "downgrade to previous version $PRE_VER" + + # destory current cluster + tiup dm destroy --yes $CLUSTER_NAME + + exec_incremental_stage3 + + # deploy previous cluster + tiup dm deploy --yes $CLUSTER_NAME $PRE_VER $CUR/conf/topo.yaml + tiup dm start --yes $CLUSTER_NAME + + # config import + tiup dmctl:$CUR_VER --master-addr=master1:8261 config import -d new_configs + + exec_incremental_stage4 + + run_dmctl_with_retry $CUR_VER "operate-source show" "mysql-replica-01" 1 "mariadb-replica-02" 1 + run_dmctl_with_retry $CUR_VER "list-member --worker" "\"stage\": \"bound\"" 2 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + check_sync_diff $WORK_DIR $CUR/conf/diff_config_optimistic.toml + check_sync_diff $WORK_DIR $CUR/conf/diff_config_pessimistic.toml } function destroy_v2_by_tiup() { + export DM_MASTER_ADDR="master1:8261" + tiup dmctl:$CUR_VER stop-task $TASK_NAME tiup dm destroy --yes $CLUSTER_NAME } @@ -100,6 +158,10 @@ function test() { migrate_in_v2 + diff_configs + + downgrade_to_previous_v2 + destroy_v2_by_tiup }