Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Support migration data from source with different time_zone by default #1670

Merged
merged 20 commits into from
May 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion chaos/cases/conf/task-optimistic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ shard-mode: optimistic
# so ignore sharding table check, seems has no effect on sync-diff-inspector
# see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-19.html
ignore-checking-items: ["schema_of_shard_tables","auto_increment_ID"] # tables generated by go-sqlsmith may have auto increment ID
timezone: "UTC"

target-database:
host: "tidb-0.tidb"
Expand Down
1 change: 0 additions & 1 deletion chaos/cases/conf/task-pessimistic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ shard-mode: pessimistic
# so ignore sharding table check, seems has no effect on sync-diff-inspector
# see https://dev.mysql.com/doc/relnotes/mysql/8.0/en/news-8-0-19.html
ignore-checking-items: ["schema_of_shard_tables","auto_increment_ID"] # tables generated by go-sqlsmith may have auto increment ID
timezone: "UTC"

target-database:
host: "tidb-0.tidb"
Expand Down
1 change: 0 additions & 1 deletion chaos/cases/conf/task-single.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
---
name: "task_single"
task-mode: all
timezone: "UTC"

target-database:
host: "tidb-0.tidb"
Expand Down
12 changes: 6 additions & 6 deletions chaos/cases/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ func createSources(ctx context.Context, cli pb.MasterClient, cfg *config) error
return err
}

cfg1 := config2.NewSourceConfig()
cfg2 := config2.NewSourceConfig()
cfg3 := config2.NewSourceConfig()
if err = cfg1.ParseYaml(string(s1Content)); err != nil {
cfg1, err := config2.ParseYaml(string(s1Content))
if err != nil {
return err
}
if err = cfg2.ParseYaml(string(s2Content)); err != nil {
cfg2, err := config2.ParseYaml(string(s2Content))
if err != nil {
return err
}
if err = cfg3.ParseYaml(string(s3Content)); err != nil {
cfg3, err := config2.ParseYaml(string(s3Content))
if err != nil {
return err
}

Expand Down
22 changes: 14 additions & 8 deletions cmd/dm-syncer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"flag"
"fmt"
"os"
"time"

"github.com/BurntSushi/toml"
"github.com/go-mysql-org/go-mysql/mysql"
Expand All @@ -27,6 +26,7 @@ import (
router "github.com/pingcap/tidb-tools/pkg/table-router"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/utils"
)

Expand Down Expand Up @@ -54,6 +54,7 @@ type commonConfig struct {
SafeMode bool
MaxRetry int

// deprecated
TimezoneStr string

SyncerConfigFormat bool
Expand All @@ -76,13 +77,13 @@ func (c *commonConfig) newConfigFromSyncerConfig(args []string) (*config.SubTask
EnableGTID: c.EnableGTID,
SafeMode: c.SafeMode,
MaxRetry: c.MaxRetry,
TimezoneStr: c.TimezoneStr,
}

cfg.FlagSet = flag.NewFlagSet("dm-syncer", flag.ContinueOnError)
fs := cfg.FlagSet

var SyncerConfigFormat bool
var timezoneStr string

fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")
fs.StringVar(&cfg.Name, "name", "", "the task name")
Expand All @@ -101,7 +102,7 @@ func (c *commonConfig) newConfigFromSyncerConfig(args []string) (*config.SubTask
fs.BoolVar(&cfg.EnableGTID, "enable-gtid", false, "enable gtid mode")
fs.BoolVar(&cfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.IntVar(&cfg.MaxRetry, "max-retry", 100, "maxinum retry when network interruption")
fs.StringVar(&cfg.TimezoneStr, "timezone", "", "target database timezone location string")
fs.StringVar(&timezoneStr, "timezone", "", "target database timezone location string")
fs.BoolVar(&SyncerConfigFormat, "syncer-config-format", false, "read syncer config format")

if err := fs.Parse(args); err != nil {
Expand All @@ -119,6 +120,10 @@ func (c *commonConfig) newConfigFromSyncerConfig(args []string) (*config.SubTask
return nil, errors.Trace(err)
}

if timezoneStr != "" {
log.L().Warn("'--timezone' is deprecated, needn't set it anymore.")
}

return cfg.convertToNewFormat()
}

Expand Down Expand Up @@ -146,6 +151,7 @@ func (c *commonConfig) newSubTaskConfig(args []string) (*config.SubTaskConfig, e
var syncerConfigFormat bool
var printVersion bool
var serverID uint
var timezoneStr string

fs.BoolVar(&printVersion, "V", false, "prints version and exit")
fs.StringVar(&cfg.Name, "name", "", "the task name")
Expand All @@ -163,7 +169,7 @@ func (c *commonConfig) newSubTaskConfig(args []string) (*config.SubTaskConfig, e
fs.BoolVar(&cfg.EnableGTID, "enable-gtid", false, "enable gtid mode")
fs.BoolVar(&cfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.IntVar(&cfg.MaxRetry, "max-retry", 100, "maxinum retry when network interruption")
fs.StringVar(&cfg.Timezone, "timezone", "", "target database timezone location string")
fs.StringVar(&timezoneStr, "timezone", "", "target database timezone location string")
fs.StringVar(&cfg.Name, "cp-table-prefix", "dm-syncer", "the prefix of the checkpoint table name")
fs.BoolVar(&syncerConfigFormat, "syncer-config-format", false, "read syncer config format")

Expand All @@ -172,6 +178,9 @@ func (c *commonConfig) newSubTaskConfig(args []string) (*config.SubTaskConfig, e
if err := cfg.Parse(args, false); err != nil {
return nil, errors.Trace(err)
}
if timezoneStr != "" {
log.L().Warn("'--timezone' is deprecated, needn't set it anymore.")
}

if serverID != 101 {
cfg.ServerID = uint32(serverID)
Expand Down Expand Up @@ -202,7 +211,6 @@ func newCommonConfig() *commonConfig {
fs.BoolVar(&cfg.EnableGTID, "enable-gtid", false, "enable gtid mode")
fs.BoolVar(&cfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.IntVar(&cfg.MaxRetry, "max-retry", 100, "maxinum retry when network interruption")
fs.StringVar(&cfg.TimezoneStr, "timezone", "", "target database timezone location string")
fs.BoolVar(&cfg.SyncerConfigFormat, "syncer-config-format", false, "read syncer config format")

return cfg
Expand Down Expand Up @@ -265,8 +273,7 @@ type syncerConfig struct {
// MaxDMLConnectionTimeout string `toml:"execute-dml-timeout" json:"execute-dml-timeout"`
// ExecutionQueueLength int `toml:"execute-queue-length" json:"execute-queue-length"`

TimezoneStr string `toml:"timezone" json:"timezone"`
Timezone *time.Location `json:"-"`
TimezoneStr string `toml:"timezone" json:"timezone"`

printVersion bool
}
Expand Down Expand Up @@ -343,7 +350,6 @@ func (oc *syncerConfig) convertToNewFormat() (*config.SubTaskConfig, error) {
},

ConfigFile: oc.ConfigFile,
Timezone: oc.TimezoneStr,
From: oc.From,
To: oc.To,
}
Expand Down
29 changes: 20 additions & 9 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ type SourceConfig struct {

// NewSourceConfig creates a new base config for upstream MySQL/MariaDB source.
func NewSourceConfig() *SourceConfig {
c := newSourceConfig()
c.adjust()
return c
}

// NewSourceConfig creates a new base config without adjust.
func newSourceConfig() *SourceConfig {
c := &SourceConfig{
Purge: PurgeConfig{
Interval: 60 * 60,
Expand All @@ -94,7 +101,6 @@ func NewSourceConfig() *SourceConfig {
BackoffMax: Duration{DefaultBackoffMax},
},
}
c.adjust()
return c
}

Expand Down Expand Up @@ -140,12 +146,13 @@ func (c *SourceConfig) Parse(content string) error {
}

// ParseYaml parses flag definitions from the argument list, content should be yaml format.
func (c *SourceConfig) ParseYaml(content string) error {
func ParseYaml(content string) (*SourceConfig, error) {
c := newSourceConfig()
if err := yaml.UnmarshalStrict([]byte(content), c); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
return nil, terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
}
c.adjust()
return nil
return c, nil
}

// EncodeToml encodes config.
Expand Down Expand Up @@ -311,16 +318,20 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
}

// LoadFromFile loads config from file.
func (c *SourceConfig) LoadFromFile(path string) error {
func LoadFromFile(path string) (*SourceConfig, error) {
c := newSourceConfig()
content, err := ioutil.ReadFile(path)
if err != nil {
return terror.ErrConfigReadCfgFromFile.Delegate(err, path)
return nil, terror.ErrConfigReadCfgFromFile.Delegate(err, path)
}
if err := yaml.UnmarshalStrict(content, c); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
if err = yaml.UnmarshalStrict(content, c); err != nil {
return nil, terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
}
c.adjust()
return c.Verify()
if err = c.Verify(); err != nil {
return nil, err
}
return c, nil
}

func (c *SourceConfig) check(metaData *toml.MetaData, err error) error {
Expand Down
51 changes: 29 additions & 22 deletions dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ import (
const sourceSampleFile = "../worker/source.yaml"

func (t *testConfig) TestConfig(c *C) {
cfg := NewSourceConfig()

c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"
c.Assert(cfg.RelayDir, Equals, "./xx")
c.Assert(cfg.ServerID, Equals, uint32(101))
Expand Down Expand Up @@ -63,18 +62,24 @@ func (t *testConfig) TestConfig(c *C) {
// test update config file and reload
c.Assert(cfg.Parse(tomlStr), IsNil)
c.Assert(cfg.ServerID, Equals, uint32(100))
c.Assert(cfg.ParseYaml(yamlStr), IsNil)
c.Assert(cfg.ServerID, Equals, uint32(100))
c.Assert(cfg.Parse(originCfgStr), IsNil)
c.Assert(cfg.ServerID, Equals, uint32(101))
c.Assert(cfg.ParseYaml(originCfgYamlStr), IsNil)
c.Assert(cfg.ServerID, Equals, uint32(101))
cfg1, err := ParseYaml(yamlStr)
c.Assert(err, IsNil)
c.Assert(cfg1.ServerID, Equals, uint32(100))
cfg.Filters = []*bf.BinlogEventRule{}
cfg.Tracer = map[string]interface{}{}

var cfg2 SourceConfig
c.Assert(cfg2.Parse(originCfgStr), IsNil)
c.Assert(cfg2.ServerID, Equals, uint32(101))

cfg3, err := ParseYaml(originCfgYamlStr)
c.Assert(err, IsNil)
c.Assert(cfg3.ServerID, Equals, uint32(101))

// test decrypt password
clone1.From.Password = "1234"
clone1.ServerID = 101
// fix empty map after marshal/unmarshal becomes nil
clone1.From.Session = map[string]string{}
clone1.From.Session = cfg.From.Session
clone1.Tracer = map[string]interface{}{}
clone1.Filters = []*bf.BinlogEventRule{}
clone2 := cfg.DecryptPassword()
Expand All @@ -96,16 +101,18 @@ func (t *testConfig) TestConfig(c *C) {
c.Assert(err, IsNil)
c.Assert(clone4toml, Matches, "(.|\n)*backoff-rollback = \"5m(.|\n)*")
c.Assert(clone4toml, Matches, "(.|\n)*backoff-max = \"5m(.|\n)*")

var clone5 SourceConfig
c.Assert(clone5.Parse(clone4toml), IsNil)
c.Assert(clone5, DeepEquals, *clone4)
clone4yaml, err := clone4.Yaml()
c.Assert(err, IsNil)
c.Assert(clone4yaml, Matches, "(.|\n)*backoff-rollback: 5m(.|\n)*")
c.Assert(clone4yaml, Matches, "(.|\n)*backoff-max: 5m(.|\n)*")
var clone6 SourceConfig
c.Assert(clone6.ParseYaml(clone4yaml), IsNil)
c.Assert(clone6, DeepEquals, *clone4)

clone6, err := ParseYaml(clone4yaml)
c.Assert(err, IsNil)
c.Assert(clone6, DeepEquals, clone4)

// test invalid config
dir2 := c.MkDir()
Expand All @@ -116,15 +123,15 @@ aaa: xxx
`)
err = ioutil.WriteFile(configFile, configContent, 0o644)
c.Assert(err, IsNil)
err = cfg.LoadFromFile(configFile)
_, err = LoadFromFile(configFile)
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, "(.|\n)*field aaa not found in type config.SourceConfig(.|\n)*")
}

func (t *testConfig) TestConfigVerify(c *C) {
newConfig := func() *SourceConfig {
cfg := NewSourceConfig()
c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"
return cfg
}
Expand Down Expand Up @@ -247,12 +254,12 @@ func subtestFlavor(c *C, cfg *SourceConfig, sqlInfo, expectedFlavor, expectedErr
}

func (t *testConfig) TestAdjustFlavor(c *C) {
cfg := NewSourceConfig()
c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"

cfg.Flavor = "mariadb"
err := cfg.AdjustFlavor(context.Background(), nil)
err = cfg.AdjustFlavor(context.Background(), nil)
c.Assert(err, IsNil)
c.Assert(cfg.Flavor, Equals, mysql.MariaDBFlavor)
cfg.Flavor = "MongoDB"
Expand All @@ -270,8 +277,8 @@ func (t *testConfig) TestAdjustServerID(c *C) {
}()
getAllServerIDFunc = getMockServerIDs

cfg := NewSourceConfig()
c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"

c.Assert(cfg.AdjustServerID(context.Background(), nil), IsNil)
Expand Down
21 changes: 10 additions & 11 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@ import (
"flag"
"fmt"
"strings"
"time"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"

"github.com/BurntSushi/toml"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
)

// task modes.
Expand Down Expand Up @@ -118,6 +117,8 @@ func (db *DBConfig) Decode(data string) error {

// Adjust adjusts the config.
func (db *DBConfig) Adjust() {
// force set session time zone to UTC here.
AdjustTargetDBTimeZone(db)
}

// SubTaskConfig is the configuration for SubTask.
Expand Down Expand Up @@ -150,8 +151,8 @@ type SubTaskConfig struct {
HeartbeatReportInterval int `toml:"heartbeat-report-interval" json:"heartbeat-report-interval"`
EnableHeartbeat bool `toml:"enable-heartbeat" json:"enable-heartbeat"`
Meta *Meta `toml:"meta" json:"meta"`
Timezone string `toml:"timezone" josn:"timezone"`

// deprecated
Timezone string `toml:"timezone" json:"timezone"`
// RelayDir get value from dm-worker config
RelayDir string `toml:"relay-dir" json:"relay-dir"`

Expand Down Expand Up @@ -274,10 +275,8 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
}

if c.Timezone != "" {
_, err := time.LoadLocation(c.Timezone)
if err != nil {
return terror.ErrConfigInvalidTimezone.Delegate(err, c.Timezone)
}
log.L().Warn("'timezone' is deprecated, please remove this field.")
c.Timezone = ""
}

dirSuffix := "." + c.Name
Expand Down
Loading