Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
fix unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv committed May 11, 2021
1 parent f2218cb commit 744be9c
Show file tree
Hide file tree
Showing 15 changed files with 93 additions and 85 deletions.
12 changes: 6 additions & 6 deletions chaos/cases/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@ func createSources(ctx context.Context, cli pb.MasterClient, cfg *config) error
return err
}

cfg1 := config2.NewSourceConfig()
cfg2 := config2.NewSourceConfig()
cfg3 := config2.NewSourceConfig()
if err = cfg1.ParseYaml(string(s1Content)); err != nil {
cfg1, err := config2.ParseYaml(string(s1Content))
if err != nil {
return err
}
if err = cfg2.ParseYaml(string(s2Content)); err != nil {
cfg2, err := config2.ParseYaml(string(s2Content))
if err != nil {
return err
}
if err = cfg3.ParseYaml(string(s3Content)); err != nil {
cfg3, err := config2.ParseYaml(string(s3Content))
if err != nil {
return err
}

Expand Down
29 changes: 20 additions & 9 deletions dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ type SourceConfig struct {

// NewSourceConfig creates a new base config for upstream MySQL/MariaDB source.
func NewSourceConfig() *SourceConfig {
c := newSourceConfig()
c.adjust()
return c
}

// NewSourceConfig creates a new base config without adjust
func newSourceConfig() *SourceConfig {
c := &SourceConfig{
Purge: PurgeConfig{
Interval: 60 * 60,
Expand All @@ -94,7 +101,6 @@ func NewSourceConfig() *SourceConfig {
BackoffMax: Duration{DefaultBackoffMax},
},
}
c.adjust()
return c
}

Expand Down Expand Up @@ -140,12 +146,13 @@ func (c *SourceConfig) Parse(content string) error {
}

// ParseYaml parses flag definitions from the argument list, content should be yaml format.
func (c *SourceConfig) ParseYaml(content string) error {
if err := yaml.Unmarshal([]byte(content), c); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
func ParseYaml(content string) (*SourceConfig, error) {
c := newSourceConfig()
if err := yaml.UnmarshalStrict([]byte(content), c); err != nil {
return nil, terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
}
c.adjust()
return nil
return c, nil
}

// EncodeToml encodes config.
Expand Down Expand Up @@ -311,16 +318,20 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
}

// LoadFromFile loads config from file.
func (c *SourceConfig) LoadFromFile(path string) error {
func LoadFromFile(path string) (*SourceConfig, error) {
c := newSourceConfig()
content, err := ioutil.ReadFile(path)
if err != nil {
return terror.ErrConfigReadCfgFromFile.Delegate(err, path)
return nil, terror.ErrConfigReadCfgFromFile.Delegate(err, path)
}
if err := yaml.UnmarshalStrict(content, c); err != nil {
return terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
return nil, terror.ErrConfigYamlTransform.Delegate(err, "decode source config")
}
c.adjust()
return c.Verify()
if err = c.Verify(); err != nil {
return nil, err
}
return c, nil
}

func (c *SourceConfig) check(metaData *toml.MetaData, err error) error {
Expand Down
33 changes: 16 additions & 17 deletions dm/config/source_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ import (
const sourceSampleFile = "../worker/source.yaml"

func (t *testConfig) TestConfig(c *C) {
cfg := NewSourceConfig()

c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"
c.Assert(cfg.RelayDir, Equals, "./xx")
c.Assert(cfg.ServerID, Equals, uint32(101))
Expand Down Expand Up @@ -63,8 +62,8 @@ func (t *testConfig) TestConfig(c *C) {
// test update config file and reload
c.Assert(cfg.Parse(tomlStr), IsNil)
c.Assert(cfg.ServerID, Equals, uint32(100))
var cfg1 SourceConfig
c.Assert(cfg1.ParseYaml(yamlStr), IsNil)
cfg1, err := ParseYaml(yamlStr)
c.Assert(err, IsNil)
c.Assert(cfg1.ServerID, Equals, uint32(100))
cfg.Filters = []*bf.BinlogEventRule{}
cfg.Tracer = map[string]interface{}{}
Expand All @@ -73,8 +72,8 @@ func (t *testConfig) TestConfig(c *C) {
c.Assert(cfg2.Parse(originCfgStr), IsNil)
c.Assert(cfg2.ServerID, Equals, uint32(101))

var cfg3 SourceConfig
c.Assert(cfg3.ParseYaml(originCfgYamlStr), IsNil)
cfg3, err := ParseYaml(originCfgYamlStr)
c.Assert(err, IsNil)
c.Assert(cfg3.ServerID, Equals, uint32(101))

// test decrypt password
Expand Down Expand Up @@ -111,8 +110,8 @@ func (t *testConfig) TestConfig(c *C) {
c.Assert(clone4yaml, Matches, "(.|\n)*backoff-rollback: 5m(.|\n)*")
c.Assert(clone4yaml, Matches, "(.|\n)*backoff-max: 5m(.|\n)*")

var clone6 SourceConfig
c.Assert(clone6.ParseYaml(clone4yaml), IsNil)
clone6, err := ParseYaml(clone4yaml)
c.Assert(err, IsNil)
c.Assert(clone6, DeepEquals, *clone4)

// test invalid config
Expand All @@ -124,15 +123,15 @@ aaa: xxx
`)
err = ioutil.WriteFile(configFile, configContent, 0o644)
c.Assert(err, IsNil)
err = cfg.LoadFromFile(configFile)
_, err = LoadFromFile(configFile)
c.Assert(err, NotNil)
c.Assert(err, ErrorMatches, "(.|\n)*field aaa not found in type config.SourceConfig(.|\n)*")
}

func (t *testConfig) TestConfigVerify(c *C) {
newConfig := func() *SourceConfig {
cfg := NewSourceConfig()
c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"
return cfg
}
Expand Down Expand Up @@ -255,12 +254,12 @@ func subtestFlavor(c *C, cfg *SourceConfig, sqlInfo, expectedFlavor, expectedErr
}

func (t *testConfig) TestAdjustFlavor(c *C) {
cfg := NewSourceConfig()
c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"

cfg.Flavor = "mariadb"
err := cfg.AdjustFlavor(context.Background(), nil)
err = cfg.AdjustFlavor(context.Background(), nil)
c.Assert(err, IsNil)
c.Assert(cfg.Flavor, Equals, mysql.MariaDBFlavor)
cfg.Flavor = "MongoDB"
Expand All @@ -278,8 +277,8 @@ func (t *testConfig) TestAdjustServerID(c *C) {
}()
getAllServerIDFunc = getMockServerIDs

cfg := NewSourceConfig()
c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil)
cfg, err := LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
cfg.RelayDir = "./xx"

c.Assert(cfg.AdjustServerID(context.Background(), nil), IsNil)
Expand Down
11 changes: 5 additions & 6 deletions dm/master/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,12 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) {
}
password := os.Getenv("MYSQL_PSWD")

// load a valid source file.
cfg1 := config.NewSourceConfig()
cfg1, err := config.LoadFromFile("./source.yaml")
c.Assert(err, IsNil)
// fix empty map after marshal/unmarshal becomes nil
cfg1.From.Session = map[string]string{}
cfg1.Tracer = map[string]interface{}{}
cfg1.Filters = []*filter.BinlogEventRule{}
c.Assert(cfg1.LoadFromFile("./source.yaml"), IsNil)
cfg1.From.Host = host
cfg1.From.Port = port
cfg1.From.User = user
Expand Down Expand Up @@ -122,8 +121,8 @@ func (t *testMaster) TestWaitWorkersReadyV1Import(c *C) {
s.cfg.V1SourcesPath = c.MkDir()
c.Assert(s.scheduler.Start(ctx, etcdTestCli), IsNil)

cfg1 := config.NewSourceConfig()
c.Assert(cfg1.LoadFromFile("./source.yaml"), IsNil)
cfg1, err := config.LoadFromFile("./source.yaml")
c.Assert(err, IsNil)
cfg2 := cfg1.Clone()
cfg2.SourceID = "mysql-replica-02"
cfgs := map[string]config.SourceConfig{
Expand All @@ -132,7 +131,7 @@ func (t *testMaster) TestWaitWorkersReadyV1Import(c *C) {
}

// no worker registered, timeout.
err := s.waitWorkersReadyV1Import(tctx, cfgs)
err = s.waitWorkersReadyV1Import(tctx, cfgs)
c.Assert(err, ErrorMatches, ".*wait for DM-worker instances timeout.*")

// register one worker.
Expand Down
10 changes: 5 additions & 5 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Scheduler struct {
// - recover from etcd (calling `recoverSources`).
// delete:
// - remove source by user request (calling `RemoveSourceCfg`).
sourceCfgs map[string]config.SourceConfig
sourceCfgs map[string]*config.SourceConfig

// all subtask configs, task name -> source ID -> subtask config.
// add:
Expand Down Expand Up @@ -151,7 +151,7 @@ type Scheduler struct {
func NewScheduler(pLogger *log.Logger, securityCfg config.Security) *Scheduler {
return &Scheduler{
logger: pLogger.WithFields(zap.String("component", "scheduler")),
sourceCfgs: make(map[string]config.SourceConfig),
sourceCfgs: make(map[string]*config.SourceConfig),
subTaskCfgs: make(map[string]map[string]config.SubTaskConfig),
workers: make(map[string]*Worker),
bounds: make(map[string]*Worker),
Expand Down Expand Up @@ -255,7 +255,7 @@ func (s *Scheduler) CloseAllWorkers() {

// AddSourceCfg adds the upstream source config to the cluster.
// NOTE: please verify the config before call this.
func (s *Scheduler) AddSourceCfg(cfg config.SourceConfig) error {
func (s *Scheduler) AddSourceCfg(cfg *config.SourceConfig) error {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -380,7 +380,7 @@ func (s *Scheduler) GetSourceCfgByID(source string) *config.SourceConfig {
if !ok {
return nil
}
clone := cfg
clone := *cfg
return &clone
}

Expand Down Expand Up @@ -1745,7 +1745,7 @@ func (s *Scheduler) updateStatusForUnbound(source string) {

// reset resets the internal status.
func (s *Scheduler) reset() {
s.sourceCfgs = make(map[string]config.SourceConfig)
s.sourceCfgs = make(map[string]*config.SourceConfig)
s.subTaskCfgs = make(map[string]map[string]config.SubTaskConfig)
s.workers = make(map[string]*Worker)
s.bounds = make(map[string]*Worker)
Expand Down
32 changes: 16 additions & 16 deletions dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
taskName2 = "task-2"
workerInfo1 = ha.NewWorkerInfo(workerName1, workerAddr1)
workerInfo2 = ha.NewWorkerInfo(workerName2, workerAddr2)
sourceCfg1 config.SourceConfig
subtaskCfg1 config.SubTaskConfig
keepAliveTTL = int64(5) // NOTE: this should be >= minLeaseTTL, in second.

Expand All @@ -116,7 +115,8 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
}
}
)
c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil)
sourceCfg1, err := config.LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
sourceCfg1.SourceID = sourceID1
sourceCfg1.EnableRelay = true
sourceCfg2 := sourceCfg1
Expand Down Expand Up @@ -550,7 +550,7 @@ func (t *testScheduler) sourceCfgNotExist(c *C, s *Scheduler, source string) {
c.Assert(scm, HasLen, 0)
}

func (t *testScheduler) sourceCfgExist(c *C, s *Scheduler, expectCfg config.SourceConfig) {
func (t *testScheduler) sourceCfgExist(c *C, s *Scheduler, expectCfg *config.SourceConfig) {
cfgP := s.GetSourceCfgByID(expectCfg.SourceID)
c.Assert(cfgP, DeepEquals, &expectCfg)
scm, _, err := ha.GetSourceCfg(etcdTestCli, expectCfg.SourceID, 0)
Expand Down Expand Up @@ -708,11 +708,11 @@ func (t *testScheduler) TestRestartScheduler(c *C) {
workerAddr1 = "127.0.0.1:8262"
workerInfo1 = ha.NewWorkerInfo(workerName1, workerAddr1)
sourceBound1 = ha.NewSourceBound(sourceID1, workerName1)
sourceCfg1 config.SourceConfig
wg sync.WaitGroup
keepAliveTTL = int64(2) // NOTE: this should be >= minLeaseTTL, in second.
)
c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil)
sourceCfg1, err := config.LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
sourceCfg1.SourceID = sourceID1

s := NewScheduler(&logger, config.Security{})
Expand Down Expand Up @@ -822,10 +822,10 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) {
workerAddr2 = "127.0.0.1:18262"
workerAddr3 = "127.0.0.1:18362"
workerAddr4 = "127.0.0.1:18462"
sourceCfg1 config.SourceConfig
keepAliveTTL = int64(2) // NOTE: this should be >= minLeaseTTL, in second.
)
c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil)
sourceCfg1, err := config.LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
sourceCfg1.SourceID = sourceID1
sourceCfg2 := sourceCfg1
sourceCfg2.SourceID = sourceID2
Expand Down Expand Up @@ -884,7 +884,7 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) {

// step 4: trigger etcd compaction and check whether we can receive it through watcher
var startRev int64 = 1
_, err := etcdTestCli.Compact(ctx, rev)
_, err = etcdTestCli.Compact(ctx, rev)
c.Assert(err, IsNil)
workerEvCh := make(chan ha.WorkerEvent, 10)
workerErrCh := make(chan error, 10)
Expand Down Expand Up @@ -962,10 +962,10 @@ func (t *testScheduler) TestLastBound(c *C) {
workerName2 = "dm-worker-2"
workerName3 = "dm-worker-3"
workerName4 = "dm-worker-4"
sourceCfg1 config.SourceConfig
)

c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil)
sourceCfg1, err := config.LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
sourceCfg1.SourceID = sourceID1
sourceCfg2 := sourceCfg1
sourceCfg2.SourceID = sourceID2
Expand Down Expand Up @@ -1052,8 +1052,8 @@ func (t *testScheduler) TestTransferSource(c *C) {
s.workers[workerName2] = worker2
s.workers[workerName3] = worker3
s.workers[workerName4] = worker4
s.sourceCfgs[sourceID1] = config.SourceConfig{}
s.sourceCfgs[sourceID2] = config.SourceConfig{}
s.sourceCfgs[sourceID1] = &config.SourceConfig{}
s.sourceCfgs[sourceID2] = &config.SourceConfig{}

worker1.ToFree()
c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil)
Expand All @@ -1075,7 +1075,7 @@ func (t *testScheduler) TestTransferSource(c *C) {
c.Assert(worker1.Stage(), Equals, WorkerFree)

// test valid transfer: source -> worker = unbound -> free
s.sourceCfgs[sourceID3] = config.SourceConfig{}
s.sourceCfgs[sourceID3] = &config.SourceConfig{}
s.unbounds[sourceID3] = struct{}{}
c.Assert(s.TransferSource(sourceID3, workerName3), IsNil)
c.Assert(s.bounds[sourceID3], DeepEquals, worker3)
Expand All @@ -1095,7 +1095,7 @@ func (t *testScheduler) TestTransferSource(c *C) {
c.Assert(s.bounds[sourceID1], DeepEquals, worker4)

// test invalid transfer: source -> worker = unbound -> bound
s.sourceCfgs[sourceID4] = config.SourceConfig{}
s.sourceCfgs[sourceID4] = &config.SourceConfig{}
s.unbounds[sourceID4] = struct{}{}
c.Assert(s.TransferSource(sourceID4, workerName3), NotNil)
c.Assert(s.bounds[sourceID3], DeepEquals, worker3)
Expand Down Expand Up @@ -1147,8 +1147,8 @@ func (t *testScheduler) TestStartStopSource(c *C) {
s.workers[workerName2] = worker2
s.workers[workerName3] = worker3
s.workers[workerName4] = worker4
s.sourceCfgs[sourceID1] = config.SourceConfig{}
s.sourceCfgs[sourceID2] = config.SourceConfig{}
s.sourceCfgs[sourceID1] = &config.SourceConfig{}
s.sourceCfgs[sourceID2] = &config.SourceConfig{}

worker1.ToFree()
c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil)
Expand Down
Loading

0 comments on commit 744be9c

Please sign in to comment.