Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed May 11, 2021
1 parent 46f79c7 commit 28c0d22
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 88 deletions.
3 changes: 2 additions & 1 deletion cmd/dm-syncer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ package main
import (
"flag"
"fmt"
"os"

"github.com/BurntSushi/toml"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/errors"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"os"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/utils"
Expand Down
8 changes: 4 additions & 4 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (s *Server) Start() error {
}
if !bound.IsEmpty() {
log.L().Warn("worker has been assigned source before keepalive", zap.Stringer("bound", bound), zap.Bool("is deleted", bound.IsDeleted))
if err2 := s.enableHandleSubtasks(&sourceCfg, true); err2 != nil {
if err2 := s.enableHandleSubtasks(sourceCfg, true); err2 != nil {
return err2
}
log.L().Info("started to handle mysql source", zap.String("sourceCfg", sourceCfg.String()))
Expand Down Expand Up @@ -452,7 +452,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}
log.L().Info("will recover observeSourceBound",
zap.String("relay source", cfg.SourceID))
return s.enableHandleSubtasks(&cfg, false)
return s.enableHandleSubtasks(cfg, false)
}()
if err2 != nil {
return err2
Expand Down Expand Up @@ -653,7 +653,7 @@ func (s *Server) operateSourceBound(bound ha.SourceBound) error {
if !ok {
return terror.ErrWorkerFailToGetSourceConfigFromEtcd.Generate(bound.Source)
}
return s.enableHandleSubtasks(&sourceCfg, true)
return s.enableHandleSubtasks(sourceCfg, true)
}

func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock bool) error {
Expand Down Expand Up @@ -704,7 +704,7 @@ func (s *Server) operateRelaySource(relaySource ha.RelaySource) error {
if !ok {
return terror.ErrWorkerFailToGetSourceConfigFromEtcd.Generate(relaySource.Source)
}
return s.enableRelay(&sourceCfg, true)
return s.enableRelay(sourceCfg, true)
}

func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ha/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ func GetLastSourceBounds(cli *clientv3.Client) (map[string]SourceBound, int64, e
// if source bound is empty, will return an empty sourceBound and an empty source config
// if source bound is not empty but sourceConfig is empty, will return an error
// if the source bound is different for over retryNum times, will return an error.
func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, config.SourceConfig, int64, error) {
func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, *config.SourceConfig, int64, error) {
var (
bound SourceBound
newBound SourceBound
cfg config.SourceConfig
cfg *config.SourceConfig
ok bool
retryNum = defaultGetSourceBoundConfigRetry
)
Expand Down
9 changes: 4 additions & 5 deletions pkg/ha/bound_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,9 @@ func (t *testForEtcd) TestGetSourceBoundConfigEtcd(c *C) {
defer clearTestInfoOperation(c)

var (
worker = "dm-worker-1"
source = "mysql-replica-1"
bound = NewSourceBound(source, worker)
emptyCfg config.SourceConfig
worker = "dm-worker-1"
source = "mysql-replica-1"
bound = NewSourceBound(source, worker)
)
cfg, err := config.LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
Expand All @@ -138,7 +137,7 @@ func (t *testForEtcd) TestGetSourceBoundConfigEtcd(c *C) {
c.Assert(err, IsNil)
c.Assert(rev1, Greater, int64(0))
c.Assert(bound1.IsEmpty(), IsTrue)
c.Assert(cfg1, DeepEquals, emptyCfg)
c.Assert(cfg1, IsNil)

rev2, err := PutSourceBound(etcdTestCli, bound)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ha/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func GetRelayConfig(cli *clientv3.Client, worker string) (*config.SourceConfig,
return nil, 0, terror.ErrConfigMissingForBound.Generate(source)
}

return &cfg, rev2, nil
return cfg, rev2, nil
}
return nil, 0, terror.ErrWorkerRelayConfigChanging.Generate(worker, source, newSource)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ var (
ErrConfigEmptySourceID = New(codeConfigEmptySourceID, ClassConfig, ScopeInternal, LevelMedium, "empty source-id not valid", "Please check the `source-id` config in configuration file.")
ErrConfigTooLongSourceID = New(codeConfigTooLongSourceID, ClassConfig, ScopeInternal, LevelMedium, "too long source-id not valid", "Please check the `source-id` config in configuration file. The max source id length is 32.")
ErrConfigOnlineSchemeNotSupport = New(codeConfigOnlineSchemeNotSupport, ClassConfig, ScopeInternal, LevelMedium, "online scheme %s not supported", "Please check the `online-ddl-scheme` config in task configuration file. Only `ghost` and `pt` are currently supported.")
ErrConfigInvalidTimezone = New(codeConfigInvalidTimezone, ClassConfig, ScopeInternal, LevelMedium, "invalid timezone string: %s", "Please check the `timezone` config in task configuration file.")
ErrConfigInvalidTimezone = New(codeConfigInvalidTimezone, ClassConfig, ScopeInternal, LevelMedium, "invalid timezone string: %s", "Please check the `timezone` config in task configuration file.")
ErrConfigParseFlagSet = New(codeConfigParseFlagSet, ClassConfig, ScopeInternal, LevelMedium, "parse subtask config flag set", "")
ErrConfigDecryptDBPassword = New(codeConfigDecryptDBPassword, ClassConfig, ScopeInternal, LevelMedium, "decrypt DB password %s failed", "")
ErrConfigMetaInvalid = New(codeConfigMetaInvalid, ClassConfig, ScopeInternal, LevelMedium, "must specify `binlog-name` without GTID enabled for the source or specify `binlog-gtid` with GTID enabled for the source", "Please check the `meta` config in task configuration file.")
Expand Down
48 changes: 3 additions & 45 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,16 @@ import (
"fmt"
"regexp"
"strings"
"time"

"github.com/pingcap/errors"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

"github.com/pingcap/failpoint"
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/timeutil"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
)

// TrimCtrlChars returns a slice of the string s with all leading
Expand Down Expand Up @@ -214,40 +209,3 @@ func NonRepeatStringsEqual(a, b []string) bool {
}
return true
}

// ParseTimeZone parse location info from time offset("+08:00") or location string ("Asia/Shanghai")
//
// Copy from https://github.com/pingcap/tidb/blob/263a47e85ce04f74ec80d1d35b426618bc89b5a3/sessionctx/variable/varsutil.go#L411
func ParseTimeZone(s string) (*time.Location, error) {
if strings.EqualFold(s, "SYSTEM") {
return timeutil.SystemLocation(), nil
}
loc, err := time.LoadLocation(s)
if err == nil {
return loc, nil
}
// The value can be given as a string indicating an offset from UTC, such as '+10:00' or '-6:00'.
// The time zone's value should in [-12:59,+14:00].
if strings.HasPrefix(s, "+") || strings.HasPrefix(s, "-") {
d, err := types.ParseDuration(nil, s[1:], 0)
if err == nil {
if s[0] == '-' {
if d.Duration > 12*time.Hour+59*time.Minute {
return nil, errors.Errorf("invalid time zone '%s'", s)
}
} else {
if d.Duration > 14*time.Hour {
return nil, errors.Errorf("invalid time zone '%s'", s)
}
}

ofst := int(d.Duration / time.Second)
if s[0] == '-' {
ofst = -ofst
}
return time.FixedZone("", ofst), nil
}
}

return nil, errors.Errorf("invalid time zone '%s'", s)
}
29 changes: 0 additions & 29 deletions pkg/utils/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"bytes"
"context"
"fmt"
"time"

"github.com/DATA-DOG/go-sqlmock"
. "github.com/pingcap/check"
Expand Down Expand Up @@ -257,31 +256,3 @@ func (s *testCommonSuite) TestNonRepeatStringsEqual(c *C) {
c.Assert(NonRepeatStringsEqual([]string{}, []string{"1"}), IsFalse)
c.Assert(NonRepeatStringsEqual([]string{"1", "2"}, []string{"2", "3"}), IsFalse)
}

func (s *testCommonSuite) TestParseTimeZone(c *C) {
cases := map[string]time.Duration{
"+00:00": time.Duration(0),
"+01:00": time.Hour,
"-08:03": -1 * (8*time.Hour + 3*time.Minute),
"-12:59": -1 * (12*time.Hour + 59*time.Minute),
"+12:59": 12*time.Hour + 59*time.Minute,
"Asia/Shanghai": 8 * time.Hour,
"UTC": time.Duration(0),
}
for k, v := range cases {
dur, err := ParseTimeZone(k)
c.Assert(err, IsNil)
c.Assert(dur, Equals, v)
}

badCases := []string{
"test",
"-13:00",
"+14:05",
}

for _, s := range badCases {
_, err := ParseTimeZone(s)
c.Assert(err, ErrorMatches, "invalid time zone.*")
}
}

0 comments on commit 28c0d22

Please sign in to comment.