Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Support migration data from source with different time_zone by default (
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored May 13, 2021
1 parent b16f9b7 commit 0145c37
Show file tree
Hide file tree
Showing 117 changed files with 288 additions and 335 deletions.
1 change: 0 additions & 1 deletion chaos/cases/conf/task-optimistic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ shard-mode: optimistic
# so ignore sharding table check, seems has no effect on sync-diff-inspector
# see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-19.html
ignore-checking-items: ["schema_of_shard_tables","auto_increment_ID"] # tables generated by go-sqlsmith may have auto increment ID
timezone: "UTC"

target-database:
host: "tidb-0.tidb"
Expand Down
1 change: 0 additions & 1 deletion chaos/cases/conf/task-pessimistic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ shard-mode: pessimistic
# so ignore sharding table check, seems has no effect on sync-diff-inspector
# see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-19.html
ignore-checking-items: ["schema_of_shard_tables","auto_increment_ID"] # tables generated by go-sqlsmith may have auto increment ID
timezone: "UTC"

target-database:
host: "tidb-0.tidb"
Expand Down
1 change: 0 additions & 1 deletion chaos/cases/conf/task-single.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
---
name: "task_single"
task-mode: all
timezone: "UTC"

target-database:
host: "tidb-0.tidb"
Expand Down
12 changes: 6 additions & 6 deletions chaos/cases/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ func createSources(ctx context.Context, cli pb.MasterClient, cfg *config) error
return err
}

cfg1 := config2.NewSourceConfig()
cfg2 := config2.NewSourceConfig()
cfg3 := config2.NewSourceConfig()
if err = cfg1.ParseYaml(string(s1Content)); err != nil {
cfg1, err := config2.ParseYaml(string(s1Content))
if err != nil {
return err
}
if err = cfg2.ParseYaml(string(s2Content)); err != nil {
cfg2, err := config2.ParseYaml(string(s2Content))
if err != nil {
return err
}
if err = cfg3.ParseYaml(string(s3Content)); err != nil {
cfg3, err := config2.ParseYaml(string(s3Content))
if err != nil {
return err
}

Expand Down
22 changes: 14 additions & 8 deletions cmd/dm-syncer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"flag"
"fmt"
"os"
"time"

"github.com/BurntSushi/toml"
"github.com/go-mysql-org/go-mysql/mysql"
Expand All @@ -27,6 +26,7 @@ import (
router "github.com/pingcap/tidb-tools/pkg/table-router"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

Expand Down Expand Up @@ -54,6 +54,7 @@ type commonConfig struct {
SafeMode bool
MaxRetry int

// deprecated
TimezoneStr string

SyncerConfigFormat bool
Expand All @@ -76,13 +77,13 @@ func (c *commonConfig) newConfigFromSyncerConfig(args []string) (*config.SubTask
EnableGTID: c.EnableGTID,
SafeMode: c.SafeMode,
MaxRetry: c.MaxRetry,
TimezoneStr: c.TimezoneStr,
}

cfg.FlagSet = flag.NewFlagSet("dm-syncer", flag.ContinueOnError)
fs := cfg.FlagSet

var SyncerConfigFormat bool
var timezoneStr string

fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")
fs.StringVar(&cfg.Name, "name", "", "the task name")
Expand All @@ -101,7 +102,7 @@ func (c *commonConfig) newConfigFromSyncerConfig(args []string) (*config.SubTask
fs.BoolVar(&cfg.EnableGTID, "enable-gtid", false, "enable gtid mode")
fs.BoolVar(&cfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.IntVar(&cfg.MaxRetry, "max-retry", 100, "maxinum retry when network interruption")
fs.StringVar(&cfg.TimezoneStr, "timezone", "", "target database timezone location string")
fs.StringVar(&timezoneStr, "timezone", "", "target database timezone location string")
fs.BoolVar(&SyncerConfigFormat, "syncer-config-format", false, "read syncer config format")

if err := fs.Parse(args); err != nil {
Expand All @@ -119,6 +120,10 @@ func (c *commonConfig) newConfigFromSyncerConfig(args []string) (*config.SubTask
return nil, errors.Trace(err)
}

if timezoneStr != "" {
log.L().Warn("'--timezone' is deprecated, needn't set it anymore.")
}

return cfg.convertToNewFormat()
}

Expand Down Expand Up @@ -146,6 +151,7 @@ func (c *commonConfig) newSubTaskConfig(args []string) (*config.SubTaskConfig, e
var syncerConfigFormat bool
var printVersion bool
var serverID uint
var timezoneStr string

fs.BoolVar(&printVersion, "V", false, "prints version and exit")
fs.StringVar(&cfg.Name, "name", "", "the task name")
Expand All @@ -163,7 +169,7 @@ func (c *commonConfig) newSubTaskConfig(args []string) (*config.SubTaskConfig, e
fs.BoolVar(&cfg.EnableGTID, "enable-gtid", false, "enable gtid mode")
fs.BoolVar(&cfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.IntVar(&cfg.MaxRetry, "max-retry", 100, "maxinum retry when network interruption")
fs.StringVar(&cfg.Timezone, "timezone", "", "target database timezone location string")
fs.StringVar(&timezoneStr, "timezone", "", "target database timezone location string")
fs.StringVar(&cfg.Name, "cp-table-prefix", "dm-syncer", "the prefix of the checkpoint table name")
fs.BoolVar(&syncerConfigFormat, "syncer-config-format", false, "read syncer config format")

Expand All @@ -172,6 +178,9 @@ func (c *commonConfig) newSubTaskConfig(args []string) (*config.SubTaskConfig, e
if err := cfg.Parse(args, false); err != nil {
return nil, errors.Trace(err)
}
if timezoneStr != "" {
log.L().Warn("'--timezone' is deprecated, needn't set it anymore.")
}

if serverID != 101 {
cfg.ServerID = uint32(serverID)
Expand Down Expand Up @@ -202,7 +211,6 @@ func newCommonConfig() *commonConfig {
fs.BoolVar(&cfg.EnableGTID, "enable-gtid", false, "enable gtid mode")
fs.BoolVar(&cfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.IntVar(&cfg.MaxRetry, "max-retry", 100, "maxinum retry when network interruption")
fs.StringVar(&cfg.TimezoneStr, "timezone", "", "target database timezone location string")
fs.BoolVar(&cfg.SyncerConfigFormat, "syncer-config-format", false, "read syncer config format")

return cfg
Expand Down Expand Up @@ -265,8 +273,7 @@ type syncerConfig struct {
// MaxDMLConnectionTimeout string `toml:"execute-dml-timeout" json:"execute-dml-timeout"`
// ExecutionQueueLength int `toml:"execute-queue-length" json:"execute-queue-length"`

TimezoneStr string `toml:"timezone" json:"timezone"`
Timezone *time.Location `json:"-"`
TimezoneStr string `toml:"timezone" json:"timezone"`

printVersion bool
}
Expand Down Expand Up @@ -343,7 +350,6 @@ func (oc *syncerConfig) convertToNewFormat() (*config.SubTaskConfig, error) {
},

ConfigFile: oc.ConfigFile,
Timezone: oc.TimezoneStr,
From: oc.From,
To: oc.To,
}
Expand Down
29 changes: 20 additions & 9 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ type SourceConfig struct {

// NewSourceConfig creates a new base config for upstream MySQL/MariaDB source.
func NewSourceConfig() *SourceConfig {
c := newSourceConfig()
c.adjust()
return c
}

// NewSourceConfig creates a new base config without adjust.
func newSourceConfig() *SourceConfig {
c := &SourceConfig{
Purge: PurgeConfig{
Interval: 60 * 60,
Expand All @@ -94,7 +101,6 @@ func NewSourceConfig() *SourceConfig {
BackoffMax: Duration{DefaultBackoffMax},
},
}
c.adjust()
return c
}

Expand Down Expand Up @@ -140,12 +146,13 @@ func (c *SourceConfig) Parse(content string) error {
}

// ParseYaml parses flag definitions from the argument list, content should be yaml format.
func (c *SourceConfig) ParseYaml(content string) error {
func ParseYaml(content string) (*SourceConfig, error) {
c := newSourceConfig()
if err := yaml.UnmarshalStrict([]byte(content), c); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
return nil, terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
}
c.adjust()
return nil
return c, nil
}

// EncodeToml encodes config.
Expand Down Expand Up @@ -311,16 +318,20 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
}

// LoadFromFile loads config from file.
func (c *SourceConfig) LoadFromFile(path string) error {
func LoadFromFile(path string) (*SourceConfig, error) {
c := newSourceConfig()
content, err := ioutil.ReadFile(path)
if err != nil {
return terror.ErrConfigReadCfgFromFile.Delegate(err, path)
return nil, terror.ErrConfigReadCfgFromFile.Delegate(err, path)
}
if err := yaml.UnmarshalStrict(content, c); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
if err = yaml.UnmarshalStrict(content, c); err != nil {
return nil, terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
}
c.adjust()
return c.Verify()
if err = c.Verify(); err != nil {
return nil, err
}
return c, nil
}

func (c *SourceConfig) check(metaData *toml.MetaData, err error) error {
Expand Down
51 changes: 29 additions & 22 deletions dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ import (
const sourceSampleFile = "../worker/source.yaml"

func (t *testConfig) TestConfig(c *C) {
cfg := NewSourceConfig()

c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"
c.Assert(cfg.RelayDir, Equals, "./xx")
c.Assert(cfg.ServerID, Equals, uint32(101))
Expand Down Expand Up @@ -63,18 +62,24 @@ func (t *testConfig) TestConfig(c *C) {
// test update config file and reload
c.Assert(cfg.Parse(tomlStr), IsNil)
c.Assert(cfg.ServerID, Equals, uint32(100))
c.Assert(cfg.ParseYaml(yamlStr), IsNil)
c.Assert(cfg.ServerID, Equals, uint32(100))
c.Assert(cfg.Parse(originCfgStr), IsNil)
c.Assert(cfg.ServerID, Equals, uint32(101))
c.Assert(cfg.ParseYaml(originCfgYamlStr), IsNil)
c.Assert(cfg.ServerID, Equals, uint32(101))
cfg1, err := ParseYaml(yamlStr)
c.Assert(err, IsNil)
c.Assert(cfg1.ServerID, Equals, uint32(100))
cfg.Filters = []*bf.BinlogEventRule{}
cfg.Tracer = map[string]interface{}{}

var cfg2 SourceConfig
c.Assert(cfg2.Parse(originCfgStr), IsNil)
c.Assert(cfg2.ServerID, Equals, uint32(101))

cfg3, err := ParseYaml(originCfgYamlStr)
c.Assert(err, IsNil)
c.Assert(cfg3.ServerID, Equals, uint32(101))

// test decrypt password
clone1.From.Password = "1234"
clone1.ServerID = 101
// fix empty map after marshal/unmarshal becomes nil
clone1.From.Session = map[string]string{}
clone1.From.Session = cfg.From.Session
clone1.Tracer = map[string]interface{}{}
clone1.Filters = []*bf.BinlogEventRule{}
clone2 := cfg.DecryptPassword()
Expand All @@ -96,16 +101,18 @@ func (t *testConfig) TestConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(clone4toml, Matches, "(.|\n)*backoff-rollback = \"5m(.|\n)*")
c.Assert(clone4toml, Matches, "(.|\n)*backoff-max = \"5m(.|\n)*")

var clone5 SourceConfig
c.Assert(clone5.Parse(clone4toml), IsNil)
c.Assert(clone5, DeepEquals, *clone4)
clone4yaml, err := clone4.Yaml()
c.Assert(err, IsNil)
c.Assert(clone4yaml, Matches, "(.|\n)*backoff-rollback: 5m(.|\n)*")
c.Assert(clone4yaml, Matches, "(.|\n)*backoff-max: 5m(.|\n)*")
var clone6 SourceConfig
c.Assert(clone6.ParseYaml(clone4yaml), IsNil)
c.Assert(clone6, DeepEquals, *clone4)

clone6, err := ParseYaml(clone4yaml)
c.Assert(err, IsNil)
c.Assert(clone6, DeepEquals, clone4)

// test invalid config
dir2 := c.MkDir()
Expand All @@ -116,15 +123,15 @@ aaa: xxx
`)
err = ioutil.WriteFile(configFile, configContent, 0o644)
c.Assert(err, IsNil)
err = cfg.LoadFromFile(configFile)
_, err = LoadFromFile(configFile)
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, "(.|\n)*field aaa not found in type config.SourceConfig(.|\n)*")
}

func (t *testConfig) TestConfigVerify(c *C) {
newConfig := func() *SourceConfig {
cfg := NewSourceConfig()
c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"
return cfg
}
Expand Down Expand Up @@ -247,12 +254,12 @@ func subtestFlavor(c *C, cfg *SourceConfig, sqlInfo, expectedFlavor, expectedErr
}

func (t *testConfig) TestAdjustFlavor(c *C) {
cfg := NewSourceConfig()
c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"

cfg.Flavor = "mariadb"
err := cfg.AdjustFlavor(context.Background(), nil)
err = cfg.AdjustFlavor(context.Background(), nil)
c.Assert(err, IsNil)
c.Assert(cfg.Flavor, Equals, mysql.MariaDBFlavor)
cfg.Flavor = "MongoDB"
Expand All @@ -270,8 +277,8 @@ func (t *testConfig) TestAdjustServerID(c *C) {
}()
getAllServerIDFunc = getMockServerIDs

cfg := NewSourceConfig()
c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"

c.Assert(cfg.AdjustServerID(context.Background(), nil), IsNil)
Expand Down
21 changes: 10 additions & 11 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@ import (
"flag"
"fmt"
"strings"
"time"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"

"github.com/BurntSushi/toml"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// task modes.
Expand Down Expand Up @@ -118,6 +117,8 @@ func (db *DBConfig) Decode(data string) error {

// Adjust adjusts the config.
func (db *DBConfig) Adjust() {
// force set session time zone to UTC here.
AdjustTargetDBTimeZone(db)
}

// SubTaskConfig is the configuration for SubTask.
Expand Down Expand Up @@ -150,8 +151,8 @@ type SubTaskConfig struct {
HeartbeatReportInterval int `toml:"heartbeat-report-interval" json:"heartbeat-report-interval"`
EnableHeartbeat bool `toml:"enable-heartbeat" json:"enable-heartbeat"`
Meta *Meta `toml:"meta" json:"meta"`
Timezone string `toml:"timezone" josn:"timezone"`

// deprecated
Timezone string `toml:"timezone" json:"timezone"`
// RelayDir get value from dm-worker config
RelayDir string `toml:"relay-dir" json:"relay-dir"`

Expand Down Expand Up @@ -274,10 +275,8 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
}

if c.Timezone != "" {
_, err := time.LoadLocation(c.Timezone)
if err != nil {
return terror.ErrConfigInvalidTimezone.Delegate(err, c.Timezone)
}
log.L().Warn("'timezone' is deprecated, please remove this field.")
c.Timezone = ""
}

dirSuffix := "." + c.Name
Expand Down
Loading

0 comments on commit 0145c37

Please sign in to comment.