Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

downgrade: add import/export-configs command (#1877) #1921

Merged
85 changes: 84 additions & 1 deletion dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type PurgeConfig struct {
RemainSpace int64 `yaml:"remain-space" toml:"remain-space" json:"remain-space"` // if remain space in @RelayBaseDir less than @RemainSpace (GB), then it can be purged
}

// SourceConfig is the configuration for Worker.
// SourceConfig is the configuration for source.
type SourceConfig struct {
EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"`
AutoFixGTID bool `yaml:"auto-fix-gtid" toml:"auto-fix-gtid" json:"auto-fix-gtid"`
Expand Down Expand Up @@ -353,3 +353,86 @@ func (c *SourceConfig) check(metaData *toml.MetaData, err error) error {
c.adjust()
return nil
}

// YamlForDowngrade returns YAML format represents of config for downgrade.
func (c *SourceConfig) YamlForDowngrade() (string, error) {
s := NewSourceConfigForDowngrade(c)

// encrypt password
cipher, err := utils.Encrypt(utils.DecryptOrPlaintext(c.From.Password))
if err != nil {
return "", err
}
s.From.Password = cipher

// omit default values, so we can ignore them for later marshal
s.omitDefaultVals()

return s.Yaml()
}

// SourceConfigForDowngrade is the base configuration for source in v2.0.
// This config is used for downgrade(config export) from a higher dmctl version.
// When we add any new config item into SourceConfig, we should update it also.
type SourceConfigForDowngrade struct {
EnableGTID bool `yaml:"enable-gtid"`
AutoFixGTID bool `yaml:"auto-fix-gtid"`
RelayDir string `yaml:"relay-dir"`
MetaDir string `yaml:"meta-dir"`
Flavor string `yaml:"flavor"`
Charset string `yaml:"charset"`
EnableRelay bool `yaml:"enable-relay"`
RelayBinLogName string `yaml:"relay-binlog-name"`
RelayBinlogGTID string `yaml:"relay-binlog-gtid"`
UUIDSuffix int `yaml:"-"`
SourceID string `yaml:"source-id"`
From DBConfig `yaml:"from"`
Purge PurgeConfig `yaml:"purge"`
Checker CheckerConfig `yaml:"checker"`
ServerID uint32 `yaml:"server-id"`
Tracer map[string]interface{} `yaml:"tracer"`
// any new config item, we mark it omitempty
CaseSensitive bool `yaml:"case-sensitive,omitempty"`
Filters []*bf.BinlogEventRule `yaml:"filters,omitempty"`
}

// NewSourceConfigForDowngrade creates a new base config for downgrade.
func NewSourceConfigForDowngrade(sourceCfg *SourceConfig) *SourceConfigForDowngrade {
return &SourceConfigForDowngrade{
EnableGTID: sourceCfg.EnableGTID,
AutoFixGTID: sourceCfg.AutoFixGTID,
RelayDir: sourceCfg.RelayDir,
MetaDir: sourceCfg.MetaDir,
Flavor: sourceCfg.Flavor,
Charset: sourceCfg.Charset,
EnableRelay: sourceCfg.EnableRelay,
RelayBinLogName: sourceCfg.RelayBinLogName,
RelayBinlogGTID: sourceCfg.RelayBinlogGTID,
UUIDSuffix: sourceCfg.UUIDSuffix,
SourceID: sourceCfg.SourceID,
From: sourceCfg.From,
Purge: sourceCfg.Purge,
Checker: sourceCfg.Checker,
ServerID: sourceCfg.ServerID,
Tracer: sourceCfg.Tracer,
CaseSensitive: sourceCfg.CaseSensitive,
Filters: sourceCfg.Filters,
}
}

// omitDefaultVals change default value to empty value for new config item.
// If any default value for new config item is not empty(0 or false or nil),
// we should change it to empty.
func (c *SourceConfigForDowngrade) omitDefaultVals() {
if len(c.From.Session) > 0 {
if timeZone, ok := c.From.Session["time_zone"]; ok && timeZone == defaultTimeZone {
delete(c.From.Session, "time_zone")
}
}
}

// Yaml returns YAML format representation of the config.
func (c *SourceConfigForDowngrade) Yaml() (string, error) {
b, err := yaml.Marshal(c)
return string(b), err
}
17 changes: 17 additions & 0 deletions dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io/ioutil"
"path"
"reflect"
"strings"
"time"

Expand Down Expand Up @@ -235,6 +236,22 @@ func (t *testConfig) TestConfigVerify(c *C) {
}
}

func (t *testConfig) TestSourceConfigForDowngrade(c *C) {
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)

// make sure all new field were added
cfgForDowngrade := NewSourceConfigForDowngrade(cfg)
cfgReflect := reflect.Indirect(reflect.ValueOf(cfg))
cfgForDowngradeReflect := reflect.Indirect(reflect.ValueOf(cfgForDowngrade))
c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField())

// make sure all field were copied
cfgForClone := &SourceConfigForDowngrade{}
Clone(cfgForClone, cfg)
c.Assert(cfgForDowngrade, DeepEquals, cfgForClone)
}

func subtestFlavor(c *C, cfg *SourceConfig, sqlInfo, expectedFlavor, expectedError string) {
cfg.Flavor = ""
db, mock, err := sqlmock.New()
Expand Down
152 changes: 152 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// Online DDL Scheme.
Expand Down Expand Up @@ -935,3 +936,154 @@ func checkValidExpr(expr string) error {
_, _, err := defaultParser.Parse(expr, "", "")
return err
}

// YamlForDowngrade returns YAML format represents of config for downgrade.
func (c *TaskConfig) YamlForDowngrade() (string, error) {
t := NewTaskConfigForDowngrade(c)

// encrypt password
cipher, err := utils.Encrypt(utils.DecryptOrPlaintext(t.TargetDB.Password))
if err != nil {
return "", err
}
t.TargetDB.Password = cipher

// omit default values, so we can ignore them for later marshal
t.omitDefaultVals()

return t.Yaml()
}

// MySQLInstanceForDowngrade represents a sync config of a MySQL instance for downgrade.
type MySQLInstanceForDowngrade struct {
SourceID string `yaml:"source-id"`
Meta *Meta `yaml:"meta"`
FilterRules []string `yaml:"filter-rules"`
ColumnMappingRules []string `yaml:"column-mapping-rules"`
RouteRules []string `yaml:"route-rules"`
BWListName string `yaml:"black-white-list"`
BAListName string `yaml:"block-allow-list"`
MydumperConfigName string `yaml:"mydumper-config-name"`
Mydumper *MydumperConfig `yaml:"mydumper"`
MydumperThread int `yaml:"mydumper-thread"`
LoaderConfigName string `yaml:"loader-config-name"`
Loader *LoaderConfig `yaml:"loader"`
LoaderThread int `yaml:"loader-thread"`
SyncerConfigName string `yaml:"syncer-config-name"`
Syncer *SyncerConfig `yaml:"syncer"`
SyncerThread int `yaml:"syncer-thread"`
// new config item
ExpressionFilters []string `yaml:"expression-filters,omitempty"`
}

// NewMySQLInstancesForDowngrade creates []* MySQLInstanceForDowngrade.
func NewMySQLInstancesForDowngrade(mysqlInstances []*MySQLInstance) []*MySQLInstanceForDowngrade {
mysqlInstancesForDowngrade := make([]*MySQLInstanceForDowngrade, 0, len(mysqlInstances))
for _, m := range mysqlInstances {
newMySQLInstance := &MySQLInstanceForDowngrade{
SourceID: m.SourceID,
Meta: m.Meta,
FilterRules: m.FilterRules,
ColumnMappingRules: m.ColumnMappingRules,
RouteRules: m.RouteRules,
BWListName: m.BWListName,
BAListName: m.BAListName,
MydumperConfigName: m.MydumperConfigName,
Mydumper: m.Mydumper,
MydumperThread: m.MydumperThread,
LoaderConfigName: m.LoaderConfigName,
Loader: m.Loader,
LoaderThread: m.LoaderThread,
SyncerConfigName: m.SyncerConfigName,
Syncer: m.Syncer,
SyncerThread: m.SyncerThread,
ExpressionFilters: m.ExpressionFilters,
}
mysqlInstancesForDowngrade = append(mysqlInstancesForDowngrade, newMySQLInstance)
}
return mysqlInstancesForDowngrade
}

// TaskConfigForDowngrade is the base configuration for task in v2.0.
// This config is used for downgrade(config export) from a higher dmctl version.
// When we add any new config item into SourceConfig, we should update it also.
type TaskConfigForDowngrade struct {
Name string `yaml:"name"`
TaskMode string `yaml:"task-mode"`
IsSharding bool `yaml:"is-sharding"`
ShardMode string `yaml:"shard-mode"`
IgnoreCheckingItems []string `yaml:"ignore-checking-items"`
MetaSchema string `yaml:"meta-schema"`
EnableHeartbeat bool `yaml:"enable-heartbeat"`
HeartbeatUpdateInterval int `yaml:"heartbeat-update-interval"`
HeartbeatReportInterval int `yaml:"heartbeat-report-interval"`
Timezone string `yaml:"timezone"`
CaseSensitive bool `yaml:"case-sensitive"`
TargetDB *DBConfig `yaml:"target-database"`
OnlineDDLScheme string `yaml:"online-ddl-scheme"`
Routes map[string]*router.TableRule `yaml:"routes"`
Filters map[string]*bf.BinlogEventRule `yaml:"filters"`
ColumnMappings map[string]*column.Rule `yaml:"column-mappings"`
BWList map[string]*filter.Rules `yaml:"black-white-list"`
BAList map[string]*filter.Rules `yaml:"block-allow-list"`
Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"`
Loaders map[string]*LoaderConfig `yaml:"loaders"`
Syncers map[string]*SyncerConfig `yaml:"syncers"`
CleanDumpFile bool `yaml:"clean-dump-file"`
EnableANSIQuotes bool `yaml:"ansi-quotes"`
RemoveMeta bool `yaml:"remove-meta"`
// new config item
MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"`
OnlineDDL bool `yaml:"online-ddl,omitempty"`
}

// NewTaskConfigForDowngrade create new TaskConfigForDowngrade.
func NewTaskConfigForDowngrade(taskConfig *TaskConfig) *TaskConfigForDowngrade {
return &TaskConfigForDowngrade{
Name: taskConfig.Name,
TaskMode: taskConfig.TaskMode,
IsSharding: taskConfig.IsSharding,
ShardMode: taskConfig.ShardMode,
IgnoreCheckingItems: taskConfig.IgnoreCheckingItems,
MetaSchema: taskConfig.MetaSchema,
EnableHeartbeat: taskConfig.EnableHeartbeat,
HeartbeatUpdateInterval: taskConfig.HeartbeatUpdateInterval,
HeartbeatReportInterval: taskConfig.HeartbeatReportInterval,
Timezone: taskConfig.Timezone,
CaseSensitive: taskConfig.CaseSensitive,
TargetDB: taskConfig.TargetDB,
OnlineDDLScheme: taskConfig.OnlineDDLScheme,
Routes: taskConfig.Routes,
Filters: taskConfig.Filters,
ColumnMappings: taskConfig.ColumnMappings,
BWList: taskConfig.BWList,
BAList: taskConfig.BAList,
Mydumpers: taskConfig.Mydumpers,
Loaders: taskConfig.Loaders,
Syncers: taskConfig.Syncers,
CleanDumpFile: taskConfig.CleanDumpFile,
EnableANSIQuotes: taskConfig.EnableANSIQuotes,
RemoveMeta: taskConfig.RemoveMeta,
MySQLInstances: NewMySQLInstancesForDowngrade(taskConfig.MySQLInstances),
ExprFilter: taskConfig.ExprFilter,
OnlineDDL: taskConfig.OnlineDDL,
}
}

// omitDefaultVals change default value to empty value for new config item.
// If any default value for new config item is not empty(0 or false or nil),
// we should change it to empty.
func (c *TaskConfigForDowngrade) omitDefaultVals() {
if len(c.TargetDB.Session) > 0 {
if timeZone, ok := c.TargetDB.Session["time_zone"]; ok && timeZone == defaultTimeZone {
delete(c.TargetDB.Session, "time_zone")
}
}
}

// Yaml returns YAML format representation of config.
func (c *TaskConfigForDowngrade) Yaml() (string, error) {
b, err := yaml.Marshal(c)
return string(b), err
}
81 changes: 79 additions & 2 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config
import (
"io/ioutil"
"path"
"reflect"
"sort"
"strings"

Expand All @@ -29,8 +30,7 @@ import (
"github.com/coreos/go-semver/semver"
)

func (t *testConfig) TestUnusedTaskConfig(c *C) {
correctTaskConfig := `---
var correctTaskConfig = `---
name: test
task-mode: all
shard-mode: "pessimistic"
Expand Down Expand Up @@ -135,6 +135,8 @@ mysql-instances:
loader-config-name: "global2"
syncer-config-name: "global2"
`

func (t *testConfig) TestUnusedTaskConfig(c *C) {
taskConfig := NewTaskConfig()
err := taskConfig.Decode(correctTaskConfig)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -1011,3 +1013,78 @@ func (t *testConfig) TestExclusiveAndWrongExprFilterFields(c *C) {
err = cfg.adjust()
c.Assert(terror.ErrConfigExprFilterWrongGrammar.Equal(err), IsTrue)
}

func (t *testConfig) TestTaskConfigForDowngrade(c *C) {
cfg := NewTaskConfig()
err := cfg.Decode(correctTaskConfig)
c.Assert(err, IsNil)

cfgForDowngrade := NewTaskConfigForDowngrade(cfg)

// make sure all new field were added
cfgReflect := reflect.Indirect(reflect.ValueOf(cfg))
cfgForDowngradeReflect := reflect.Indirect(reflect.ValueOf(cfgForDowngrade))
c.Assert(cfgReflect.NumField(), Equals, cfgForDowngradeReflect.NumField()+1) // without flag

// make sure all field were copied
cfgForClone := &TaskConfigForDowngrade{}
Clone(cfgForClone, cfg)
c.Assert(cfgForDowngrade, DeepEquals, cfgForClone)
}

// Clone clones src to dest.
func Clone(dest, src interface{}) {
cloneValues(reflect.ValueOf(dest), reflect.ValueOf(src))
}

// cloneValues clone src to dest recursively.
// Note: pointer still use shallow copy.
func cloneValues(dest, src reflect.Value) {
destType := dest.Type()
srcType := src.Type()
if destType.Kind() == reflect.Ptr {
destType = destType.Elem()
}
if srcType.Kind() == reflect.Ptr {
srcType = srcType.Elem()
}

if destType.Kind() == reflect.Slice {
slice := reflect.MakeSlice(destType, src.Len(), src.Cap())
for i := 0; i < src.Len(); i++ {
if slice.Index(i).Type().Kind() == reflect.Ptr {
newVal := reflect.New(slice.Index(i).Type().Elem())
cloneValues(newVal, src.Index(i))
slice.Index(i).Set(newVal)
} else {
cloneValues(slice.Index(i).Addr(), src.Index(i).Addr())
}
}
dest.Set(slice)
return
}

destFieldsMap := map[string]int{}
for i := 0; i < destType.NumField(); i++ {
destFieldsMap[destType.Field(i).Name] = i
}
for i := 0; i < srcType.NumField(); i++ {
if j, ok := destFieldsMap[srcType.Field(i).Name]; ok {
destField := dest.Elem().Field(j)
srcField := src.Elem().Field(i)
destFieldType := destField.Type()
srcFieldType := srcField.Type()
if destFieldType.Kind() == reflect.Ptr {
destFieldType = destFieldType.Elem()
}
if srcFieldType.Kind() == reflect.Ptr {
srcFieldType = srcFieldType.Elem()
}
if destFieldType != srcFieldType {
cloneValues(destField, srcField)
} else {
destField.Set(srcField)
}
}
}
}
Loading