Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed May 11, 2021
1 parent 46f79c7 commit 4abff02
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 101 deletions.
3 changes: 2 additions & 1 deletion cmd/dm-syncer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ package main
import (
"flag"
"fmt"
"os"

"github.com/BurntSushi/toml"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/errors"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"os"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/utils"
Expand Down
10 changes: 5 additions & 5 deletions dm/master/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ func (s *Server) importFromV10x(ctx context.Context) error {
}

// collectSourceConfigFilesV1Import tries to collect source config files for v1.0.x importing.
func (s *Server) collectSourceConfigFilesV1Import(tctx *tcontext.Context) (map[string]config.SourceConfig, error) {
func (s *Server) collectSourceConfigFilesV1Import(tctx *tcontext.Context) (map[string]*config.SourceConfig, error) {
files, err := ioutil.ReadDir(s.cfg.V1SourcesPath)
if err != nil {
return nil, err
}

cfgs := make(map[string]config.SourceConfig)
cfgs := make(map[string]*config.SourceConfig)
for _, f := range files {
if f.IsDir() {
continue // ignore sub directories.
Expand All @@ -186,7 +186,7 @@ func (s *Server) collectSourceConfigFilesV1Import(tctx *tcontext.Context) (map[s
return nil, err
}

cfgs[cfgs2[0].SourceID] = *cfgs2[0]
cfgs[cfgs2[0].SourceID] = cfgs2[0]
tctx.Logger.Info("collected source config", zap.Stringer("config", cfgs2[0]))
}

Expand All @@ -195,7 +195,7 @@ func (s *Server) collectSourceConfigFilesV1Import(tctx *tcontext.Context) (map[s

// waitWorkersReadyV1Import waits for DM-worker instances ready for v1.0.x importing.
// NOTE: in v1.0.x, `count of DM-worker instances` equals `count of source config files`.
func (s *Server) waitWorkersReadyV1Import(tctx *tcontext.Context, sourceCfgs map[string]config.SourceConfig) error {
func (s *Server) waitWorkersReadyV1Import(tctx *tcontext.Context, sourceCfgs map[string]*config.SourceConfig) error {
// now, we simply check count repeatedly.
count := len(sourceCfgs)
ctx2, cancel2 := context.WithTimeout(context.Background(), waitWorkerV1Timeout)
Expand Down Expand Up @@ -291,7 +291,7 @@ func (s *Server) getSubtaskCfgsStagesV1Import(tctx *tcontext.Context) (
}

// addSourcesV1Import tries to add source config into the cluster for v1.0.x importing.
func (s *Server) addSourcesV1Import(tctx *tcontext.Context, cfgs map[string]config.SourceConfig) error {
func (s *Server) addSourcesV1Import(tctx *tcontext.Context, cfgs map[string]*config.SourceConfig) error {
var (
added []string
err error
Expand Down
10 changes: 5 additions & 5 deletions dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
cfgs, err = s.collectSourceConfigFilesV1Import(tctx)
c.Assert(err, IsNil)
c.Assert(cfgs, HasLen, 2)
c.Assert(cfgs[cfg1.SourceID], DeepEquals, *cfg1)
c.Assert(cfgs[cfg2.SourceID], DeepEquals, *cfg2)
c.Assert(cfgs[cfg1.SourceID], DeepEquals, cfg1)
c.Assert(cfgs[cfg2.SourceID], DeepEquals, cfg2)

// put a invalid source file.
c.Assert(ioutil.WriteFile(filepath.Join(s.cfg.V1SourcesPath, "invalid.yaml"), []byte("invalid-source-data"), 0o644), IsNil)
Expand Down Expand Up @@ -125,9 +125,9 @@ func (t *testMaster) TestWaitWorkersReadyV1Import(c *C) {
c.Assert(err, IsNil)
cfg2 := cfg1.Clone()
cfg2.SourceID = "mysql-replica-02"
cfgs := map[string]config.SourceConfig{
cfg1.SourceID: *cfg1,
cfg2.SourceID: *cfg2,
cfgs := map[string]*config.SourceConfig{
cfg1.SourceID: cfg1,
cfg2.SourceID: cfg2,
}

// no worker registered, timeout.
Expand Down
2 changes: 1 addition & 1 deletion dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest
err error
)
for _, cfg := range cfgs {
err = s.scheduler.AddSourceCfg(*cfg)
err = s.scheduler.AddSourceCfg(cfg)
// return first error and try to revert, so user could copy-paste same start command after error
if err != nil {
resp.Msg = err.Error()
Expand Down
4 changes: 2 additions & 2 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func testMockScheduler(ctx context.Context, wg *sync.WaitGroup, c *check.C, sour
cfg := config.NewSourceConfig()
cfg.SourceID = sources[i]
cfg.From.Password = password
c.Assert(scheduler2.AddSourceCfg(*cfg), check.IsNil, check.Commentf("all sources: %v", sources))
c.Assert(scheduler2.AddSourceCfg(cfg), check.IsNil, check.Commentf("all sources: %v", sources))
wg.Add(1)
ctx1, cancel1 := context.WithCancel(ctx)
cancels = append(cancels, cancel1)
Expand Down Expand Up @@ -303,7 +303,7 @@ func testMockSchedulerForRelay(ctx context.Context, wg *sync.WaitGroup, c *check
cfg := config.NewSourceConfig()
cfg.SourceID = sources[i]
cfg.From.Password = password
c.Assert(scheduler2.AddSourceCfg(*cfg), check.IsNil, check.Commentf("all sources: %v", sources))
c.Assert(scheduler2.AddSourceCfg(cfg), check.IsNil, check.Commentf("all sources: %v", sources))
wg.Add(1)
ctx1, cancel1 := context.WithCancel(ctx)
cancels = append(cancels, cancel1)
Expand Down
8 changes: 4 additions & 4 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *Server) Start() error {
}
if !bound.IsEmpty() {
log.L().Warn("worker has been assigned source before keepalive", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted))
if err2 := s.enableHandleSubtasks(&sourceCfg, true); err2 != nil {
if err2 := s.enableHandleSubtasks(sourceCfg, true); err2 != nil {
return err2
}
log.L().Info("started to handle mysql source", zap.String("sourceCfg", sourceCfg.String()))
Expand Down Expand Up @@ -452,7 +452,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}
log.L().Info("will recover observeSourceBound",
zap.String("relay source", cfg.SourceID))
return s.enableHandleSubtasks(&cfg, false)
return s.enableHandleSubtasks(cfg, false)
}()
if err2 != nil {
return err2
Expand Down Expand Up @@ -653,7 +653,7 @@ func (s *Server) operateSourceBound(bound ha.SourceBound) error {
if !ok {
return terror.ErrWorkerFailToGetSourceConfigFromEtcd.Generate(bound.Source)
}
return s.enableHandleSubtasks(&sourceCfg, true)
return s.enableHandleSubtasks(sourceCfg, true)
}

func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock bool) error {
Expand Down Expand Up @@ -704,7 +704,7 @@ func (s *Server) operateRelaySource(relaySource ha.RelaySource) error {
if !ok {
return terror.ErrWorkerFailToGetSourceConfigFromEtcd.Generate(relaySource.Source)
}
return s.enableRelay(&sourceCfg, true)
return s.enableRelay(sourceCfg, true)
}

func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ha/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ func GetLastSourceBounds(cli *clientv3.Client) (map[string]SourceBound, int64, e
// if source bound is empty, will return an empty sourceBound and an empty source config
// if source bound is not empty but sourceConfig is empty, will return an error
// if the source bound is different for over retryNum times, will return an error.
func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, config.SourceConfig, int64, error) {
func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, *config.SourceConfig, int64, error) {
var (
bound SourceBound
newBound SourceBound
cfg config.SourceConfig
cfg *config.SourceConfig
ok bool
retryNum = defaultGetSourceBoundConfigRetry
)
Expand Down
9 changes: 4 additions & 5 deletions pkg/ha/bound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,9 @@ func (t *testForEtcd) TestGetSourceBoundConfigEtcd(c *C) {
defer clearTestInfoOperation(c)

var (
worker = "dm-worker-1"
source = "mysql-replica-1"
bound = NewSourceBound(source, worker)
emptyCfg config.SourceConfig
worker = "dm-worker-1"
source = "mysql-replica-1"
bound = NewSourceBound(source, worker)
)
cfg, err := config.LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
Expand All @@ -138,7 +137,7 @@ func (t *testForEtcd) TestGetSourceBoundConfigEtcd(c *C) {
c.Assert(err, IsNil)
c.Assert(rev1, Greater, int64(0))
c.Assert(bound1.IsEmpty(), IsTrue)
c.Assert(cfg1, DeepEquals, emptyCfg)
c.Assert(cfg1, IsNil)

rev2, err := PutSourceBound(etcdTestCli, bound)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ha/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func GetRelayConfig(cli *clientv3.Client, worker string) (*config.SourceConfig,
return nil, 0, terror.ErrConfigMissingForBound.Generate(source)
}

return &cfg, rev2, nil
return cfg, rev2, nil
}
return nil, 0, terror.ErrWorkerRelayConfigChanging.Generate(worker, source, newSource)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ var (
ErrConfigEmptySourceID = New(codeConfigEmptySourceID, ClassConfig, ScopeInternal, LevelMedium, "empty source-id not valid", "Please check the `source-id` config in configuration file.")
ErrConfigTooLongSourceID = New(codeConfigTooLongSourceID, ClassConfig, ScopeInternal, LevelMedium, "too long source-id not valid", "Please check the `source-id` config in configuration file. The max source id length is 32.")
ErrConfigOnlineSchemeNotSupport = New(codeConfigOnlineSchemeNotSupport, ClassConfig, ScopeInternal, LevelMedium, "online scheme %s not supported", "Please check the `online-ddl-scheme` config in task configuration file. Only `ghost` and `pt` are currently supported.")
ErrConfigInvalidTimezone = New(codeConfigInvalidTimezone, ClassConfig, ScopeInternal, LevelMedium, "invalid timezone string: %s", "Please check the `timezone` config in task configuration file.")
ErrConfigInvalidTimezone = New(codeConfigInvalidTimezone, ClassConfig, ScopeInternal, LevelMedium, "invalid timezone string: %s", "Please check the `timezone` config in task configuration file.")
ErrConfigParseFlagSet = New(codeConfigParseFlagSet, ClassConfig, ScopeInternal, LevelMedium, "parse subtask config flag set", "")
ErrConfigDecryptDBPassword = New(codeConfigDecryptDBPassword, ClassConfig, ScopeInternal, LevelMedium, "decrypt DB password %s failed", "")
ErrConfigMetaInvalid = New(codeConfigMetaInvalid, ClassConfig, ScopeInternal, LevelMedium, "must specify `binlog-name` without GTID enabled for the source or specify `binlog-gtid` with GTID enabled for the source", "Please check the `meta` config in task configuration file.")
Expand Down
48 changes: 3 additions & 45 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,16 @@ import (
"fmt"
"regexp"
"strings"
"time"

"github.com/pingcap/errors"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

"github.com/pingcap/failpoint"
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/timeutil"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
)

// TrimCtrlChars returns a slice of the string s with all leading
Expand Down Expand Up @@ -214,40 +209,3 @@ func NonRepeatStringsEqual(a, b []string) bool {
}
return true
}

// ParseTimeZone parse location info from time offset("+08:00") or location string ("Asia/Shanghai")
//
// Copy from https://github.com/pingcap/tidb/blob/263a47e85ce04f74ec80d1d35b426618bc89b5a3/sessionctx/variable/varsutil.go#L411
func ParseTimeZone(s string) (*time.Location, error) {
if strings.EqualFold(s, "SYSTEM") {
return timeutil.SystemLocation(), nil
}
loc, err := time.LoadLocation(s)
if err == nil {
return loc, nil
}
// The value can be given as a string indicating an offset from UTC, such as '+10:00' or '-6:00'.
// The time zone's value should in [-12:59,+14:00].
if strings.HasPrefix(s, "+") || strings.HasPrefix(s, "-") {
d, err := types.ParseDuration(nil, s[1:], 0)
if err == nil {
if s[0] == '-' {
if d.Duration > 12*time.Hour+59*time.Minute {
return nil, errors.Errorf("invalid time zone '%s'", s)
}
} else {
if d.Duration > 14*time.Hour {
return nil, errors.Errorf("invalid time zone '%s'", s)
}
}

ofst := int(d.Duration / time.Second)
if s[0] == '-' {
ofst = -ofst
}
return time.FixedZone("", ofst), nil
}
}

return nil, errors.Errorf("invalid time zone '%s'", s)
}
29 changes: 0 additions & 29 deletions pkg/utils/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"bytes"
"context"
"fmt"
"time"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
Expand Down Expand Up @@ -257,31 +256,3 @@ func (s *testCommonSuite) TestNonRepeatStringsEqual(c *C) {
c.Assert(NonRepeatStringsEqual([]string{}, []string{"1"}), IsFalse)
c.Assert(NonRepeatStringsEqual([]string{"1", "2"}, []string{"2", "3"}), IsFalse)
}

func (s *testCommonSuite) TestParseTimeZone(c *C) {
cases := map[string]time.Duration{
"+00:00": time.Duration(0),
"+01:00": time.Hour,
"-08:03": -1 * (8*time.Hour + 3*time.Minute),
"-12:59": -1 * (12*time.Hour + 59*time.Minute),
"+12:59": 12*time.Hour + 59*time.Minute,
"Asia/Shanghai": 8 * time.Hour,
"UTC": time.Duration(0),
}
for k, v := range cases {
dur, err := ParseTimeZone(k)
c.Assert(err, IsNil)
c.Assert(dur, Equals, v)
}

badCases := []string{
"test",
"-13:00",
"+14:05",
}

for _, s := range badCases {
_, err := ParseTimeZone(s)
c.Assert(err, ErrorMatches, "invalid time zone.*")
}
}

0 comments on commit 4abff02

Please sign in to comment.