diff --git a/cmd/dm-syncer/config.go b/cmd/dm-syncer/config.go index b50143ae1b..6ecd71012d 100644 --- a/cmd/dm-syncer/config.go +++ b/cmd/dm-syncer/config.go @@ -16,13 +16,14 @@ package main import ( "flag" "fmt" + "os" + "github.com/BurntSushi/toml" "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/errors" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb-tools/pkg/filter" router "github.com/pingcap/tidb-tools/pkg/table-router" - "os" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/utils" diff --git a/dm/master/bootstrap.go b/dm/master/bootstrap.go index 1697c5cf48..13d5db0b13 100644 --- a/dm/master/bootstrap.go +++ b/dm/master/bootstrap.go @@ -162,13 +162,13 @@ func (s *Server) importFromV10x(ctx context.Context) error { } // collectSourceConfigFilesV1Import tries to collect source config files for v1.0.x importing. -func (s *Server) collectSourceConfigFilesV1Import(tctx *tcontext.Context) (map[string]config.SourceConfig, error) { +func (s *Server) collectSourceConfigFilesV1Import(tctx *tcontext.Context) (map[string]*config.SourceConfig, error) { files, err := ioutil.ReadDir(s.cfg.V1SourcesPath) if err != nil { return nil, err } - cfgs := make(map[string]config.SourceConfig) + cfgs := make(map[string]*config.SourceConfig) for _, f := range files { if f.IsDir() { continue // ignore sub directories. @@ -186,7 +186,7 @@ func (s *Server) collectSourceConfigFilesV1Import(tctx *tcontext.Context) (map[s return nil, err } - cfgs[cfgs2[0].SourceID] = *cfgs2[0] + cfgs[cfgs2[0].SourceID] = cfgs2[0] tctx.Logger.Info("collected source config", zap.Stringer("config", cfgs2[0])) } @@ -195,7 +195,7 @@ func (s *Server) collectSourceConfigFilesV1Import(tctx *tcontext.Context) (map[s // waitWorkersReadyV1Import waits for DM-worker instances ready for v1.0.x importing. // NOTE: in v1.0.x, `count of DM-worker instances` equals `count of source config files`. -func (s *Server) waitWorkersReadyV1Import(tctx *tcontext.Context, sourceCfgs map[string]config.SourceConfig) error { +func (s *Server) waitWorkersReadyV1Import(tctx *tcontext.Context, sourceCfgs map[string]*config.SourceConfig) error { // now, we simply check count repeatedly. count := len(sourceCfgs) ctx2, cancel2 := context.WithTimeout(context.Background(), waitWorkerV1Timeout) @@ -291,7 +291,7 @@ func (s *Server) getSubtaskCfgsStagesV1Import(tctx *tcontext.Context) ( } // addSourcesV1Import tries to add source config into the cluster for v1.0.x importing. -func (s *Server) addSourcesV1Import(tctx *tcontext.Context, cfgs map[string]config.SourceConfig) error { +func (s *Server) addSourcesV1Import(tctx *tcontext.Context, cfgs map[string]*config.SourceConfig) error { var ( added []string err error diff --git a/dm/master/bootstrap_test.go b/dm/master/bootstrap_test.go index 258d9fe3f7..bec21ff9a3 100644 --- a/dm/master/bootstrap_test.go +++ b/dm/master/bootstrap_test.go @@ -94,8 +94,8 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) { cfgs, err = s.collectSourceConfigFilesV1Import(tctx) c.Assert(err, IsNil) c.Assert(cfgs, HasLen, 2) - c.Assert(cfgs[cfg1.SourceID], DeepEquals, *cfg1) - c.Assert(cfgs[cfg2.SourceID], DeepEquals, *cfg2) + c.Assert(cfgs[cfg1.SourceID], DeepEquals, cfg1) + c.Assert(cfgs[cfg2.SourceID], DeepEquals, cfg2) // put a invalid source file. c.Assert(ioutil.WriteFile(filepath.Join(s.cfg.V1SourcesPath, "invalid.yaml"), []byte("invalid-source-data"), 0o644), IsNil) @@ -125,9 +125,9 @@ func (t *testMaster) TestWaitWorkersReadyV1Import(c *C) { c.Assert(err, IsNil) cfg2 := cfg1.Clone() cfg2.SourceID = "mysql-replica-02" - cfgs := map[string]config.SourceConfig{ - cfg1.SourceID: *cfg1, - cfg2.SourceID: *cfg2, + cfgs := map[string]*config.SourceConfig{ + cfg1.SourceID: cfg1, + cfg2.SourceID: cfg2, } // no worker registered, timeout. diff --git a/dm/master/server.go b/dm/master/server.go index 08c4d673a3..548ced8eeb 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1192,7 +1192,7 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest err error ) for _, cfg := range cfgs { - err = s.scheduler.AddSourceCfg(*cfg) + err = s.scheduler.AddSourceCfg(cfg) // return first error and try to revert, so user could copy-paste same start command after error if err != nil { resp.Msg = err.Error() diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 85021c28ee..c3d47c8008 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -271,7 +271,7 @@ func testMockScheduler(ctx context.Context, wg *sync.WaitGroup, c *check.C, sour cfg := config.NewSourceConfig() cfg.SourceID = sources[i] cfg.From.Password = password - c.Assert(scheduler2.AddSourceCfg(*cfg), check.IsNil, check.Commentf("all sources: %v", sources)) + c.Assert(scheduler2.AddSourceCfg(cfg), check.IsNil, check.Commentf("all sources: %v", sources)) wg.Add(1) ctx1, cancel1 := context.WithCancel(ctx) cancels = append(cancels, cancel1) @@ -303,7 +303,7 @@ func testMockSchedulerForRelay(ctx context.Context, wg *sync.WaitGroup, c *check cfg := config.NewSourceConfig() cfg.SourceID = sources[i] cfg.From.Password = password - c.Assert(scheduler2.AddSourceCfg(*cfg), check.IsNil, check.Commentf("all sources: %v", sources)) + c.Assert(scheduler2.AddSourceCfg(cfg), check.IsNil, check.Commentf("all sources: %v", sources)) wg.Add(1) ctx1, cancel1 := context.WithCancel(ctx) cancels = append(cancels, cancel1) diff --git a/dm/worker/server.go b/dm/worker/server.go index 3ceb65cbbc..3efd1fe58b 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -156,7 +156,7 @@ func (s *Server) Start() error { } if !bound.IsEmpty() { log.L().Warn("worker has been assigned source before keepalive", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted)) - if err2 := s.enableHandleSubtasks(&sourceCfg, true); err2 != nil { + if err2 := s.enableHandleSubtasks(sourceCfg, true); err2 != nil { return err2 } log.L().Info("started to handle mysql source", zap.String("sourceCfg", sourceCfg.String())) @@ -452,7 +452,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { } log.L().Info("will recover observeSourceBound", zap.String("relay source", cfg.SourceID)) - return s.enableHandleSubtasks(&cfg, false) + return s.enableHandleSubtasks(cfg, false) }() if err2 != nil { return err2 @@ -653,7 +653,7 @@ func (s *Server) operateSourceBound(bound ha.SourceBound) error { if !ok { return terror.ErrWorkerFailToGetSourceConfigFromEtcd.Generate(bound.Source) } - return s.enableHandleSubtasks(&sourceCfg, true) + return s.enableHandleSubtasks(sourceCfg, true) } func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock bool) error { @@ -704,7 +704,7 @@ func (s *Server) operateRelaySource(relaySource ha.RelaySource) error { if !ok { return terror.ErrWorkerFailToGetSourceConfigFromEtcd.Generate(relaySource.Source) } - return s.enableRelay(&sourceCfg, true) + return s.enableRelay(sourceCfg, true) } func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) error { diff --git a/pkg/ha/bound.go b/pkg/ha/bound.go index 1d7eb67516..1aff76700b 100644 --- a/pkg/ha/bound.go +++ b/pkg/ha/bound.go @@ -185,11 +185,11 @@ func GetLastSourceBounds(cli *clientv3.Client) (map[string]SourceBound, int64, e // if source bound is empty, will return an empty sourceBound and an empty source config // if source bound is not empty but sourceConfig is empty, will return an error // if the source bound is different for over retryNum times, will return an error. -func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, config.SourceConfig, int64, error) { +func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, *config.SourceConfig, int64, error) { var ( bound SourceBound newBound SourceBound - cfg config.SourceConfig + cfg *config.SourceConfig ok bool retryNum = defaultGetSourceBoundConfigRetry ) diff --git a/pkg/ha/bound_test.go b/pkg/ha/bound_test.go index 9dd8f714d2..5a40b81e4d 100644 --- a/pkg/ha/bound_test.go +++ b/pkg/ha/bound_test.go @@ -125,10 +125,9 @@ func (t *testForEtcd) TestGetSourceBoundConfigEtcd(c *C) { defer clearTestInfoOperation(c) var ( - worker = "dm-worker-1" - source = "mysql-replica-1" - bound = NewSourceBound(source, worker) - emptyCfg config.SourceConfig + worker = "dm-worker-1" + source = "mysql-replica-1" + bound = NewSourceBound(source, worker) ) cfg, err := config.LoadFromFile(sourceSampleFile) c.Assert(err, IsNil) @@ -138,7 +137,7 @@ func (t *testForEtcd) TestGetSourceBoundConfigEtcd(c *C) { c.Assert(err, IsNil) c.Assert(rev1, Greater, int64(0)) c.Assert(bound1.IsEmpty(), IsTrue) - c.Assert(cfg1, DeepEquals, emptyCfg) + c.Assert(cfg1, IsNil) rev2, err := PutSourceBound(etcdTestCli, bound) c.Assert(err, IsNil) diff --git a/pkg/ha/relay.go b/pkg/ha/relay.go index b75268aa3a..9b041573a6 100644 --- a/pkg/ha/relay.go +++ b/pkg/ha/relay.go @@ -173,7 +173,7 @@ func GetRelayConfig(cli *clientv3.Client, worker string) (*config.SourceConfig, return nil, 0, terror.ErrConfigMissingForBound.Generate(source) } - return &cfg, rev2, nil + return cfg, rev2, nil } return nil, 0, terror.ErrWorkerRelayConfigChanging.Generate(worker, source, newSource) } diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index f214422a29..fb83de36a9 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -804,7 +804,7 @@ var ( ErrConfigEmptySourceID = New(codeConfigEmptySourceID, ClassConfig, ScopeInternal, LevelMedium, "empty source-id not valid", "Please check the `source-id` config in configuration file.") ErrConfigTooLongSourceID = New(codeConfigTooLongSourceID, ClassConfig, ScopeInternal, LevelMedium, "too long source-id not valid", "Please check the `source-id` config in configuration file. The max source id length is 32.") ErrConfigOnlineSchemeNotSupport = New(codeConfigOnlineSchemeNotSupport, ClassConfig, ScopeInternal, LevelMedium, "online scheme %s not supported", "Please check the `online-ddl-scheme` config in task configuration file. Only `ghost` and `pt` are currently supported.") - ErrConfigInvalidTimezone = New(codeConfigInvalidTimezone, ClassConfig, ScopeInternal, LevelMedium, "invalid timezone string: %s", "Please check the `timezone` config in task configuration file.") + ErrConfigInvalidTimezone = New(codeConfigInvalidTimezone, ClassConfig, ScopeInternal, LevelMedium, "invalid timezone string: %s", "Please check the `timezone` config in task configuration file.") ErrConfigParseFlagSet = New(codeConfigParseFlagSet, ClassConfig, ScopeInternal, LevelMedium, "parse subtask config flag set", "") ErrConfigDecryptDBPassword = New(codeConfigDecryptDBPassword, ClassConfig, ScopeInternal, LevelMedium, "decrypt DB password %s failed", "") ErrConfigMetaInvalid = New(codeConfigMetaInvalid, ClassConfig, ScopeInternal, LevelMedium, "must specify `binlog-name` without GTID enabled for the source or specify `binlog-gtid` with GTID enabled for the source", "Please check the `meta` config in task configuration file.") diff --git a/pkg/utils/common.go b/pkg/utils/common.go index ef59108843..65ef2711f7 100644 --- a/pkg/utils/common.go +++ b/pkg/utils/common.go @@ -19,21 +19,16 @@ import ( "fmt" "regexp" "strings" - "time" - - "github.com/pingcap/errors" - - "github.com/pingcap/dm/pkg/log" - "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/failpoint" tmysql "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/filter" router "github.com/pingcap/tidb-tools/pkg/table-router" - "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/timeutil" "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/terror" ) // TrimCtrlChars returns a slice of the string s with all leading @@ -214,40 +209,3 @@ func NonRepeatStringsEqual(a, b []string) bool { } return true } - -// ParseTimeZone parse location info from time offset("+08:00") or location string ("Asia/Shanghai") -// -// Copy from https://github.com/pingcap/tidb/blob/263a47e85ce04f74ec80d1d35b426618bc89b5a3/sessionctx/variable/varsutil.go#L411 -func ParseTimeZone(s string) (*time.Location, error) { - if strings.EqualFold(s, "SYSTEM") { - return timeutil.SystemLocation(), nil - } - loc, err := time.LoadLocation(s) - if err == nil { - return loc, nil - } - // The value can be given as a string indicating an offset from UTC, such as '+10:00' or '-6:00'. - // The time zone's value should in [-12:59,+14:00]. - if strings.HasPrefix(s, "+") || strings.HasPrefix(s, "-") { - d, err := types.ParseDuration(nil, s[1:], 0) - if err == nil { - if s[0] == '-' { - if d.Duration > 12*time.Hour+59*time.Minute { - return nil, errors.Errorf("invalid time zone '%s'", s) - } - } else { - if d.Duration > 14*time.Hour { - return nil, errors.Errorf("invalid time zone '%s'", s) - } - } - - ofst := int(d.Duration / time.Second) - if s[0] == '-' { - ofst = -ofst - } - return time.FixedZone("", ofst), nil - } - } - - return nil, errors.Errorf("invalid time zone '%s'", s) -} diff --git a/pkg/utils/common_test.go b/pkg/utils/common_test.go index c199c74df0..5b2b8de634 100644 --- a/pkg/utils/common_test.go +++ b/pkg/utils/common_test.go @@ -17,7 +17,6 @@ import ( "bytes" "context" "fmt" - "time" "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" @@ -257,31 +256,3 @@ func (s *testCommonSuite) TestNonRepeatStringsEqual(c *C) { c.Assert(NonRepeatStringsEqual([]string{}, []string{"1"}), IsFalse) c.Assert(NonRepeatStringsEqual([]string{"1", "2"}, []string{"2", "3"}), IsFalse) } - -func (s *testCommonSuite) TestParseTimeZone(c *C) { - cases := map[string]time.Duration{ - "+00:00": time.Duration(0), - "+01:00": time.Hour, - "-08:03": -1 * (8*time.Hour + 3*time.Minute), - "-12:59": -1 * (12*time.Hour + 59*time.Minute), - "+12:59": 12*time.Hour + 59*time.Minute, - "Asia/Shanghai": 8 * time.Hour, - "UTC": time.Duration(0), - } - for k, v := range cases { - dur, err := ParseTimeZone(k) - c.Assert(err, IsNil) - c.Assert(dur, Equals, v) - } - - badCases := []string{ - "test", - "-13:00", - "+14:05", - } - - for _, s := range badCases { - _, err := ParseTimeZone(s) - c.Assert(err, ErrorMatches, "invalid time zone.*") - } -}