diff --git a/cmd/dm-syncer/config.go b/cmd/dm-syncer/config.go index b50143ae1b..6ecd71012d 100644 --- a/cmd/dm-syncer/config.go +++ b/cmd/dm-syncer/config.go @@ -16,13 +16,14 @@ package main import ( "flag" "fmt" + "os" + "github.com/BurntSushi/toml" "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/errors" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb-tools/pkg/filter" router "github.com/pingcap/tidb-tools/pkg/table-router" - "os" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/utils" diff --git a/dm/worker/server.go b/dm/worker/server.go index 3ceb65cbbc..3efd1fe58b 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -156,7 +156,7 @@ func (s *Server) Start() error { } if !bound.IsEmpty() { log.L().Warn("worker has been assigned source before keepalive", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted)) - if err2 := s.enableHandleSubtasks(&sourceCfg, true); err2 != nil { + if err2 := s.enableHandleSubtasks(sourceCfg, true); err2 != nil { return err2 } log.L().Info("started to handle mysql source", zap.String("sourceCfg", sourceCfg.String())) @@ -452,7 +452,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { } log.L().Info("will recover observeSourceBound", zap.String("relay source", cfg.SourceID)) - return s.enableHandleSubtasks(&cfg, false) + return s.enableHandleSubtasks(cfg, false) }() if err2 != nil { return err2 @@ -653,7 +653,7 @@ func (s *Server) operateSourceBound(bound ha.SourceBound) error { if !ok { return terror.ErrWorkerFailToGetSourceConfigFromEtcd.Generate(bound.Source) } - return s.enableHandleSubtasks(&sourceCfg, true) + return s.enableHandleSubtasks(sourceCfg, true) } func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock bool) error { @@ -704,7 +704,7 @@ func (s *Server) operateRelaySource(relaySource ha.RelaySource) error { if !ok { return terror.ErrWorkerFailToGetSourceConfigFromEtcd.Generate(relaySource.Source) } - return s.enableRelay(&sourceCfg, true) + return s.enableRelay(sourceCfg, true) } func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) error { diff --git a/pkg/ha/bound.go b/pkg/ha/bound.go index 1d7eb67516..1aff76700b 100644 --- a/pkg/ha/bound.go +++ b/pkg/ha/bound.go @@ -185,11 +185,11 @@ func GetLastSourceBounds(cli *clientv3.Client) (map[string]SourceBound, int64, e // if source bound is empty, will return an empty sourceBound and an empty source config // if source bound is not empty but sourceConfig is empty, will return an error // if the source bound is different for over retryNum times, will return an error. -func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, config.SourceConfig, int64, error) { +func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, *config.SourceConfig, int64, error) { var ( bound SourceBound newBound SourceBound - cfg config.SourceConfig + cfg *config.SourceConfig ok bool retryNum = defaultGetSourceBoundConfigRetry ) diff --git a/pkg/ha/bound_test.go b/pkg/ha/bound_test.go index 9dd8f714d2..5a40b81e4d 100644 --- a/pkg/ha/bound_test.go +++ b/pkg/ha/bound_test.go @@ -125,10 +125,9 @@ func (t *testForEtcd) TestGetSourceBoundConfigEtcd(c *C) { defer clearTestInfoOperation(c) var ( - worker = "dm-worker-1" - source = "mysql-replica-1" - bound = NewSourceBound(source, worker) - emptyCfg config.SourceConfig + worker = "dm-worker-1" + source = "mysql-replica-1" + bound = NewSourceBound(source, worker) ) cfg, err := config.LoadFromFile(sourceSampleFile) c.Assert(err, IsNil) @@ -138,7 +137,7 @@ func (t *testForEtcd) TestGetSourceBoundConfigEtcd(c *C) { c.Assert(err, IsNil) c.Assert(rev1, Greater, int64(0)) c.Assert(bound1.IsEmpty(), IsTrue) - c.Assert(cfg1, DeepEquals, emptyCfg) + c.Assert(cfg1, IsNil) rev2, err := PutSourceBound(etcdTestCli, bound) c.Assert(err, IsNil) diff --git a/pkg/ha/relay.go b/pkg/ha/relay.go index b75268aa3a..9b041573a6 100644 --- a/pkg/ha/relay.go +++ b/pkg/ha/relay.go @@ -173,7 +173,7 @@ func GetRelayConfig(cli *clientv3.Client, worker string) (*config.SourceConfig, return nil, 0, terror.ErrConfigMissingForBound.Generate(source) } - return &cfg, rev2, nil + return cfg, rev2, nil } return nil, 0, terror.ErrWorkerRelayConfigChanging.Generate(worker, source, newSource) } diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index f214422a29..fb83de36a9 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -804,7 +804,7 @@ var ( ErrConfigEmptySourceID = New(codeConfigEmptySourceID, ClassConfig, ScopeInternal, LevelMedium, "empty source-id not valid", "Please check the `source-id` config in configuration file.") ErrConfigTooLongSourceID = New(codeConfigTooLongSourceID, ClassConfig, ScopeInternal, LevelMedium, "too long source-id not valid", "Please check the `source-id` config in configuration file. The max source id length is 32.") ErrConfigOnlineSchemeNotSupport = New(codeConfigOnlineSchemeNotSupport, ClassConfig, ScopeInternal, LevelMedium, "online scheme %s not supported", "Please check the `online-ddl-scheme` config in task configuration file. Only `ghost` and `pt` are currently supported.") - ErrConfigInvalidTimezone = New(codeConfigInvalidTimezone, ClassConfig, ScopeInternal, LevelMedium, "invalid timezone string: %s", "Please check the `timezone` config in task configuration file.") + ErrConfigInvalidTimezone = New(codeConfigInvalidTimezone, ClassConfig, ScopeInternal, LevelMedium, "invalid timezone string: %s", "Please check the `timezone` config in task configuration file.") ErrConfigParseFlagSet = New(codeConfigParseFlagSet, ClassConfig, ScopeInternal, LevelMedium, "parse subtask config flag set", "") ErrConfigDecryptDBPassword = New(codeConfigDecryptDBPassword, ClassConfig, ScopeInternal, LevelMedium, "decrypt DB password %s failed", "") ErrConfigMetaInvalid = New(codeConfigMetaInvalid, ClassConfig, ScopeInternal, LevelMedium, "must specify `binlog-name` without GTID enabled for the source or specify `binlog-gtid` with GTID enabled for the source", "Please check the `meta` config in task configuration file.") diff --git a/pkg/utils/common.go b/pkg/utils/common.go index ef59108843..65ef2711f7 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -19,21 +19,16 @@ import ( "fmt" "regexp" "strings" - "time" - - "github.com/pingcap/errors" - - "github.com/pingcap/dm/pkg/log" - "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/failpoint" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" router "github.com/pingcap/tidb-tools/pkg/table-router" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/timeutil" "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/terror" ) // TrimCtrlChars returns a slice of the string s with all leading @@ -214,40 +209,3 @@ func NonRepeatStringsEqual(a, b []string) bool { } return true } - -// ParseTimeZone parse location info from time offset("+08:00") or location string ("Asia/Shanghai") -// -// Copy from https://github.com/pingcap/tidb/blob/263a47e85ce04f74ec80d1d35b426618bc89b5a3/sessionctx/variable/varsutil.go#L411 -func ParseTimeZone(s string) (*time.Location, error) { - if strings.EqualFold(s, "SYSTEM") { - return timeutil.SystemLocation(), nil - } - loc, err := time.LoadLocation(s) - if err == nil { - return loc, nil - } - // The value can be given as a string indicating an offset from UTC, such as '+10:00' or '-6:00'. - // The time zone's value should in [-12:59,+14:00]. - if strings.HasPrefix(s, "+") || strings.HasPrefix(s, "-") { - d, err := types.ParseDuration(nil, s[1:], 0) - if err == nil { - if s[0] == '-' { - if d.Duration > 12*time.Hour+59*time.Minute { - return nil, errors.Errorf("invalid time zone '%s'", s) - } - } else { - if d.Duration > 14*time.Hour { - return nil, errors.Errorf("invalid time zone '%s'", s) - } - } - - ofst := int(d.Duration / time.Second) - if s[0] == '-' { - ofst = -ofst - } - return time.FixedZone("", ofst), nil - } - } - - return nil, errors.Errorf("invalid time zone '%s'", s) -} diff --git a/pkg/utils/common_test.go b/pkg/utils/common_test.go index c199c74df0..5b2b8de634 100644 --- a/pkg/utils/common_test.go +++ b/pkg/utils/common_test.go @@ -17,7 +17,6 @@ import ( "bytes" "context" "fmt" - "time" "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" @@ -257,31 +256,3 @@ func (s *testCommonSuite) TestNonRepeatStringsEqual(c *C) { c.Assert(NonRepeatStringsEqual([]string{}, []string{"1"}), IsFalse) c.Assert(NonRepeatStringsEqual([]string{"1", "2"}, []string{"2", "3"}), IsFalse) } - -func (s *testCommonSuite) TestParseTimeZone(c *C) { - cases := map[string]time.Duration{ - "+00:00": time.Duration(0), - "+01:00": time.Hour, - "-08:03": -1 * (8*time.Hour + 3*time.Minute), - "-12:59": -1 * (12*time.Hour + 59*time.Minute), - "+12:59": 12*time.Hour + 59*time.Minute, - "Asia/Shanghai": 8 * time.Hour, - "UTC": time.Duration(0), - } - for k, v := range cases { - dur, err := ParseTimeZone(k) - c.Assert(err, IsNil) - c.Assert(dur, Equals, v) - } - - badCases := []string{ - "test", - "-13:00", - "+14:05", - } - - for _, s := range badCases { - _, err := ParseTimeZone(s) - c.Assert(err, ErrorMatches, "invalid time zone.*") - } -}