diff --git a/chaos/cases/source.go b/chaos/cases/source.go index 545be2bb01..46e2f11780 100644 --- a/chaos/cases/source.go +++ b/chaos/cases/source.go @@ -45,16 +45,16 @@ func createSources(ctx context.Context, cli pb.MasterClient, cfg *config) error return err } - cfg1 := config2.NewSourceConfig() - cfg2 := config2.NewSourceConfig() - cfg3 := config2.NewSourceConfig() - if err = cfg1.ParseYaml(string(s1Content)); err != nil { + cfg1, err := config2.ParseYaml(string(s1Content)) + if err != nil { return err } - if err = cfg2.ParseYaml(string(s2Content)); err != nil { + cfg2, err := config2.ParseYaml(string(s2Content)) + if err != nil { return err } - if err = cfg3.ParseYaml(string(s3Content)); err != nil { + cfg3, err := config2.ParseYaml(string(s3Content)) + if err != nil { return err } diff --git a/dm/config/source_config.go b/dm/config/source_config.go index f603fba8bb..f6e7027b25 100644 --- a/dm/config/source_config.go +++ b/dm/config/source_config.go @@ -82,6 +82,13 @@ type SourceConfig struct { // NewSourceConfig creates a new base config for upstream MySQL/MariaDB source. func NewSourceConfig() *SourceConfig { + c := newSourceConfig() + c.adjust() + return c +} + +// NewSourceConfig creates a new base config without adjust +func newSourceConfig() *SourceConfig { c := &SourceConfig{ Purge: PurgeConfig{ Interval: 60 * 60, @@ -94,7 +101,6 @@ func NewSourceConfig() *SourceConfig { BackoffMax: Duration{DefaultBackoffMax}, }, } - c.adjust() return c } @@ -140,12 +146,13 @@ func (c *SourceConfig) Parse(content string) error { } // ParseYaml parses flag definitions from the argument list, content should be yaml format. -func (c *SourceConfig) ParseYaml(content string) error { - if err := yaml.Unmarshal([]byte(content), c); err != nil { - return terror.ErrConfigYamlTransform.Delegate(err, "decode source config") +func ParseYaml(content string) (*SourceConfig, error) { + c := newSourceConfig() + if err := yaml.UnmarshalStrict([]byte(content), c); err != nil { + return nil, terror.ErrConfigYamlTransform.Delegate(err, "decode source config") } c.adjust() - return nil + return c, nil } // EncodeToml encodes config. @@ -311,16 +318,20 @@ func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error { } // LoadFromFile loads config from file. -func (c *SourceConfig) LoadFromFile(path string) error { +func LoadFromFile(path string) (*SourceConfig, error) { + c := newSourceConfig() content, err := ioutil.ReadFile(path) if err != nil { - return terror.ErrConfigReadCfgFromFile.Delegate(err, path) + return nil, terror.ErrConfigReadCfgFromFile.Delegate(err, path) } if err := yaml.UnmarshalStrict(content, c); err != nil { - return terror.ErrConfigYamlTransform.Delegate(err, "decode source config") + return nil, terror.ErrConfigYamlTransform.Delegate(err, "decode source config") } c.adjust() - return c.Verify() + if err = c.Verify(); err != nil { + return nil, err + } + return c, nil } func (c *SourceConfig) check(metaData *toml.MetaData, err error) error { diff --git a/dm/config/source_config_test.go b/dm/config/source_config_test.go index d33580133d..a59523a931 100644 --- a/dm/config/source_config_test.go +++ b/dm/config/source_config_test.go @@ -32,9 +32,8 @@ import ( const sourceSampleFile = "../worker/source.yaml" func (t *testConfig) TestConfig(c *C) { - cfg := NewSourceConfig() - - c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil) + cfg, err := LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) cfg.RelayDir = "./xx" c.Assert(cfg.RelayDir, Equals, "./xx") c.Assert(cfg.ServerID, Equals, uint32(101)) @@ -63,8 +62,8 @@ func (t *testConfig) TestConfig(c *C) { // test update config file and reload c.Assert(cfg.Parse(tomlStr), IsNil) c.Assert(cfg.ServerID, Equals, uint32(100)) - var cfg1 SourceConfig - c.Assert(cfg1.ParseYaml(yamlStr), IsNil) + cfg1, err := ParseYaml(yamlStr) + c.Assert(err, IsNil) c.Assert(cfg1.ServerID, Equals, uint32(100)) cfg.Filters = []*bf.BinlogEventRule{} cfg.Tracer = map[string]interface{}{} @@ -73,8 +72,8 @@ func (t *testConfig) TestConfig(c *C) { c.Assert(cfg2.Parse(originCfgStr), IsNil) c.Assert(cfg2.ServerID, Equals, uint32(101)) - var cfg3 SourceConfig - c.Assert(cfg3.ParseYaml(originCfgYamlStr), IsNil) + cfg3, err := ParseYaml(originCfgYamlStr) + c.Assert(err, IsNil) c.Assert(cfg3.ServerID, Equals, uint32(101)) // test decrypt password @@ -111,8 +110,8 @@ func (t *testConfig) TestConfig(c *C) { c.Assert(clone4yaml, Matches, "(.|\n)*backoff-rollback: 5m(.|\n)*") c.Assert(clone4yaml, Matches, "(.|\n)*backoff-max: 5m(.|\n)*") - var clone6 SourceConfig - c.Assert(clone6.ParseYaml(clone4yaml), IsNil) + clone6, err := ParseYaml(clone4yaml) + c.Assert(err, IsNil) c.Assert(clone6, DeepEquals, *clone4) // test invalid config @@ -124,15 +123,15 @@ aaa: xxx `) err = ioutil.WriteFile(configFile, configContent, 0o644) c.Assert(err, IsNil) - err = cfg.LoadFromFile(configFile) + _, err = LoadFromFile(configFile) c.Assert(err, NotNil) c.Assert(err, ErrorMatches, "(.|\n)*field aaa not found in type config.SourceConfig(.|\n)*") } func (t *testConfig) TestConfigVerify(c *C) { newConfig := func() *SourceConfig { - cfg := NewSourceConfig() - c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil) + cfg, err := LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) cfg.RelayDir = "./xx" return cfg } @@ -255,12 +254,12 @@ func subtestFlavor(c *C, cfg *SourceConfig, sqlInfo, expectedFlavor, expectedErr } func (t *testConfig) TestAdjustFlavor(c *C) { - cfg := NewSourceConfig() - c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil) + cfg, err := LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) cfg.RelayDir = "./xx" cfg.Flavor = "mariadb" - err := cfg.AdjustFlavor(context.Background(), nil) + err = cfg.AdjustFlavor(context.Background(), nil) c.Assert(err, IsNil) c.Assert(cfg.Flavor, Equals, mysql.MariaDBFlavor) cfg.Flavor = "MongoDB" @@ -278,8 +277,8 @@ func (t *testConfig) TestAdjustServerID(c *C) { }() getAllServerIDFunc = getMockServerIDs - cfg := NewSourceConfig() - c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil) + cfg, err := LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) cfg.RelayDir = "./xx" c.Assert(cfg.AdjustServerID(context.Background(), nil), IsNil) diff --git a/dm/master/bootstrap_test.go b/dm/master/bootstrap_test.go index 66bf4b0b03..258d9fe3f7 100644 --- a/dm/master/bootstrap_test.go +++ b/dm/master/bootstrap_test.go @@ -68,13 +68,12 @@ func (t *testMaster) TestCollectSourceConfigFilesV1Import(c *C) { } password := os.Getenv("MYSQL_PSWD") - // load a valid source file. - cfg1 := config.NewSourceConfig() + cfg1, err := config.LoadFromFile("./source.yaml") + c.Assert(err, IsNil) // fix empty map after marshal/unmarshal becomes nil cfg1.From.Session = map[string]string{} cfg1.Tracer = map[string]interface{}{} cfg1.Filters = []*filter.BinlogEventRule{} - c.Assert(cfg1.LoadFromFile("./source.yaml"), IsNil) cfg1.From.Host = host cfg1.From.Port = port cfg1.From.User = user @@ -122,8 +121,8 @@ func (t *testMaster) TestWaitWorkersReadyV1Import(c *C) { s.cfg.V1SourcesPath = c.MkDir() c.Assert(s.scheduler.Start(ctx, etcdTestCli), IsNil) - cfg1 := config.NewSourceConfig() - c.Assert(cfg1.LoadFromFile("./source.yaml"), IsNil) + cfg1, err := config.LoadFromFile("./source.yaml") + c.Assert(err, IsNil) cfg2 := cfg1.Clone() cfg2.SourceID = "mysql-replica-02" cfgs := map[string]config.SourceConfig{ @@ -132,7 +131,7 @@ func (t *testMaster) TestWaitWorkersReadyV1Import(c *C) { } // no worker registered, timeout. - err := s.waitWorkersReadyV1Import(tctx, cfgs) + err = s.waitWorkersReadyV1Import(tctx, cfgs) c.Assert(err, ErrorMatches, ".*wait for DM-worker instances timeout.*") // register one worker. diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index eadedf69c5..0caf2acaad 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -76,7 +76,7 @@ type Scheduler struct { // - recover from etcd (calling `recoverSources`). // delete: // - remove source by user request (calling `RemoveSourceCfg`). - sourceCfgs map[string]config.SourceConfig + sourceCfgs map[string]*config.SourceConfig // all subtask configs, task name -> source ID -> subtask config. // add: @@ -151,7 +151,7 @@ type Scheduler struct { func NewScheduler(pLogger *log.Logger, securityCfg config.Security) *Scheduler { return &Scheduler{ logger: pLogger.WithFields(zap.String("component", "scheduler")), - sourceCfgs: make(map[string]config.SourceConfig), + sourceCfgs: make(map[string]*config.SourceConfig), subTaskCfgs: make(map[string]map[string]config.SubTaskConfig), workers: make(map[string]*Worker), bounds: make(map[string]*Worker), @@ -255,7 +255,7 @@ func (s *Scheduler) CloseAllWorkers() { // AddSourceCfg adds the upstream source config to the cluster. // NOTE: please verify the config before call this. -func (s *Scheduler) AddSourceCfg(cfg config.SourceConfig) error { +func (s *Scheduler) AddSourceCfg(cfg *config.SourceConfig) error { s.mu.Lock() defer s.mu.Unlock() @@ -380,7 +380,7 @@ func (s *Scheduler) GetSourceCfgByID(source string) *config.SourceConfig { if !ok { return nil } - clone := cfg + clone := *cfg return &clone } @@ -1745,7 +1745,7 @@ func (s *Scheduler) updateStatusForUnbound(source string) { // reset resets the internal status. func (s *Scheduler) reset() { - s.sourceCfgs = make(map[string]config.SourceConfig) + s.sourceCfgs = make(map[string]*config.SourceConfig) s.subTaskCfgs = make(map[string]map[string]config.SubTaskConfig) s.workers = make(map[string]*Worker) s.bounds = make(map[string]*Worker) diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index 75581797d4..0db2e17ddd 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -100,7 +100,6 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) { taskName2 = "task-2" workerInfo1 = ha.NewWorkerInfo(workerName1, workerAddr1) workerInfo2 = ha.NewWorkerInfo(workerName2, workerAddr2) - sourceCfg1 config.SourceConfig subtaskCfg1 config.SubTaskConfig keepAliveTTL = int64(5) // NOTE: this should be >= minLeaseTTL, in second. @@ -116,7 +115,8 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) { } } ) - c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil) + sourceCfg1, err := config.LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) sourceCfg1.SourceID = sourceID1 sourceCfg1.EnableRelay = true sourceCfg2 := sourceCfg1 @@ -550,7 +550,7 @@ func (t *testScheduler) sourceCfgNotExist(c *C, s *Scheduler, source string) { c.Assert(scm, HasLen, 0) } -func (t *testScheduler) sourceCfgExist(c *C, s *Scheduler, expectCfg config.SourceConfig) { +func (t *testScheduler) sourceCfgExist(c *C, s *Scheduler, expectCfg *config.SourceConfig) { cfgP := s.GetSourceCfgByID(expectCfg.SourceID) c.Assert(cfgP, DeepEquals, &expectCfg) scm, _, err := ha.GetSourceCfg(etcdTestCli, expectCfg.SourceID, 0) @@ -708,11 +708,11 @@ func (t *testScheduler) TestRestartScheduler(c *C) { workerAddr1 = "127.0.0.1:8262" workerInfo1 = ha.NewWorkerInfo(workerName1, workerAddr1) sourceBound1 = ha.NewSourceBound(sourceID1, workerName1) - sourceCfg1 config.SourceConfig wg sync.WaitGroup keepAliveTTL = int64(2) // NOTE: this should be >= minLeaseTTL, in second. ) - c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil) + sourceCfg1, err := config.LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) sourceCfg1.SourceID = sourceID1 s := NewScheduler(&logger, config.Security{}) @@ -822,10 +822,10 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) { workerAddr2 = "127.0.0.1:18262" workerAddr3 = "127.0.0.1:18362" workerAddr4 = "127.0.0.1:18462" - sourceCfg1 config.SourceConfig keepAliveTTL = int64(2) // NOTE: this should be >= minLeaseTTL, in second. ) - c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil) + sourceCfg1, err := config.LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) sourceCfg1.SourceID = sourceID1 sourceCfg2 := sourceCfg1 sourceCfg2.SourceID = sourceID2 @@ -884,7 +884,7 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) { // step 4: trigger etcd compaction and check whether we can receive it through watcher var startRev int64 = 1 - _, err := etcdTestCli.Compact(ctx, rev) + _, err = etcdTestCli.Compact(ctx, rev) c.Assert(err, IsNil) workerEvCh := make(chan ha.WorkerEvent, 10) workerErrCh := make(chan error, 10) @@ -962,10 +962,10 @@ func (t *testScheduler) TestLastBound(c *C) { workerName2 = "dm-worker-2" workerName3 = "dm-worker-3" workerName4 = "dm-worker-4" - sourceCfg1 config.SourceConfig ) - c.Assert(sourceCfg1.LoadFromFile(sourceSampleFile), IsNil) + sourceCfg1, err := config.LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) sourceCfg1.SourceID = sourceID1 sourceCfg2 := sourceCfg1 sourceCfg2.SourceID = sourceID2 @@ -1052,8 +1052,8 @@ func (t *testScheduler) TestTransferSource(c *C) { s.workers[workerName2] = worker2 s.workers[workerName3] = worker3 s.workers[workerName4] = worker4 - s.sourceCfgs[sourceID1] = config.SourceConfig{} - s.sourceCfgs[sourceID2] = config.SourceConfig{} + s.sourceCfgs[sourceID1] = &config.SourceConfig{} + s.sourceCfgs[sourceID2] = &config.SourceConfig{} worker1.ToFree() c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil) @@ -1075,7 +1075,7 @@ func (t *testScheduler) TestTransferSource(c *C) { c.Assert(worker1.Stage(), Equals, WorkerFree) // test valid transfer: source -> worker = unbound -> free - s.sourceCfgs[sourceID3] = config.SourceConfig{} + s.sourceCfgs[sourceID3] = &config.SourceConfig{} s.unbounds[sourceID3] = struct{}{} c.Assert(s.TransferSource(sourceID3, workerName3), IsNil) c.Assert(s.bounds[sourceID3], DeepEquals, worker3) @@ -1095,7 +1095,7 @@ func (t *testScheduler) TestTransferSource(c *C) { c.Assert(s.bounds[sourceID1], DeepEquals, worker4) // test invalid transfer: source -> worker = unbound -> bound - s.sourceCfgs[sourceID4] = config.SourceConfig{} + s.sourceCfgs[sourceID4] = &config.SourceConfig{} s.unbounds[sourceID4] = struct{}{} c.Assert(s.TransferSource(sourceID4, workerName3), NotNil) c.Assert(s.bounds[sourceID3], DeepEquals, worker3) @@ -1147,8 +1147,8 @@ func (t *testScheduler) TestStartStopSource(c *C) { s.workers[workerName2] = worker2 s.workers[workerName3] = worker3 s.workers[workerName4] = worker4 - s.sourceCfgs[sourceID1] = config.SourceConfig{} - s.sourceCfgs[sourceID2] = config.SourceConfig{} + s.sourceCfgs[sourceID1] = &config.SourceConfig{} + s.sourceCfgs[sourceID2] = &config.SourceConfig{} worker1.ToFree() c.Assert(s.boundSourceToWorker(sourceID1, worker1), IsNil) diff --git a/dm/master/server.go b/dm/master/server.go index d4c5e7f743..08c4d673a3 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1079,10 +1079,11 @@ func (s *Server) CheckTask(ctx context.Context, req *pb.CheckTaskRequest) (*pb.C func parseAndAdjustSourceConfig(ctx context.Context, contents []string) ([]*config.SourceConfig, error) { cfgs := make([]*config.SourceConfig, len(contents)) for i, content := range contents { - cfg := config.NewSourceConfig() - if err := cfg.ParseYaml(content); err != nil { + cfg, err := config.ParseYaml(content) + if err != nil { return cfgs, err } + cfg.From.Adjust() dbConfig := cfg.GenerateDBConfig() @@ -1112,8 +1113,8 @@ func parseAndAdjustSourceConfig(ctx context.Context, contents []string) ([]*conf func parseSourceConfig(contents []string) ([]*config.SourceConfig, error) { cfgs := make([]*config.SourceConfig, len(contents)) for i, content := range contents { - cfg := config.NewSourceConfig() - if err := cfg.ParseYaml(content); err != nil { + cfg, err := config.ParseYaml(content) + if err != nil { return cfgs, err } cfgs[i] = cfg diff --git a/dm/master/server_test.go b/dm/master/server_test.go index c81c0c3e0f..85021c28ee 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -1468,8 +1468,8 @@ func (t *testMaster) TestOperateSource(c *check.C) { s1.leader.Store(oneselfLeader) c.Assert(s1.Start(ctx), check.IsNil) defer s1.Close() - mysqlCfg := config.NewSourceConfig() - c.Assert(mysqlCfg.LoadFromFile("./source.yaml"), check.IsNil) + mysqlCfg, err := config.LoadFromFile("./source.yaml") + c.Assert(err, check.IsNil) mysqlCfg.From.Password = os.Getenv("MYSQL_PSWD") task, err := mysqlCfg.Yaml() c.Assert(err, check.IsNil) diff --git a/dm/worker/relay_test.go b/dm/worker/relay_test.go index 4b31463567..4bbec4a6b9 100644 --- a/dm/worker/relay_test.go +++ b/dm/worker/relay_test.go @@ -145,7 +145,7 @@ func (t *testRelay) TestRelay(c *C) { cfg.RelayDir = dir cfg.MetaDir = dir - relayHolder := NewRealRelayHolder(&cfg) + relayHolder := NewRealRelayHolder(cfg) c.Assert(relayHolder, NotNil) holder, ok := relayHolder.(*realRelayHolder) diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index 1d078a3b88..2b44f379f2 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -386,7 +386,7 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) { rev, err := ha.DeleteSourceBound(etcdCli, cfg.Name) c.Assert(err, IsNil) // step 2: start source at this worker - w, err := s.getOrStartWorker(&sourceCfg, true) + w, err := s.getOrStartWorker(sourceCfg, true) c.Assert(err, IsNil) c.Assert(w.EnableHandleSubtasks(), IsNil) // step 3: trigger etcd compaction and check whether we can receive it through watcher @@ -633,9 +633,8 @@ func checkRelayStatus(cli pb.WorkerClient, expect pb.Stage) bool { return status.SourceStatus.RelayStatus.Stage == expect } -func loadSourceConfigWithoutPassword(c *C) config.SourceConfig { - var sourceCfg config.SourceConfig - err := sourceCfg.LoadFromFile(sourceSampleFile) +func loadSourceConfigWithoutPassword(c *C) *config.SourceConfig { + sourceCfg, err := config.LoadFromFile(sourceSampleFile) c.Assert(err, IsNil) sourceCfg.From.Password = "" // no password set return sourceCfg diff --git a/pkg/ha/bound_test.go b/pkg/ha/bound_test.go index fa95f6c0b7..9dd8f714d2 100644 --- a/pkg/ha/bound_test.go +++ b/pkg/ha/bound_test.go @@ -128,10 +128,10 @@ func (t *testForEtcd) TestGetSourceBoundConfigEtcd(c *C) { worker = "dm-worker-1" source = "mysql-replica-1" bound = NewSourceBound(source, worker) - cfg config.SourceConfig emptyCfg config.SourceConfig ) - c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil) + cfg, err := config.LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) cfg.SourceID = source // no source bound and config bound1, cfg1, rev1, err := GetSourceBoundConfig(etcdTestCli, worker) diff --git a/pkg/ha/ops_test.go b/pkg/ha/ops_test.go index 3ad51caaff..80f129ae54 100644 --- a/pkg/ha/ops_test.go +++ b/pkg/ha/ops_test.go @@ -34,11 +34,11 @@ func (t *testForEtcd) TestOpsEtcd(c *C) { bound = NewSourceBound(source, worker) emptyStage Stage - sourceCfg config.SourceConfig subtaskCfg1 config.SubTaskConfig ) - c.Assert(sourceCfg.LoadFromFile(sourceSampleFile), IsNil) + sourceCfg, err := config.LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) sourceCfg.SourceID = source c.Assert(subtaskCfg1.DecodeFile(subTaskSampleFile, true), IsNil) subtaskCfg1.SourceID = source diff --git a/pkg/ha/relay_test.go b/pkg/ha/relay_test.go index 08d62aca46..05d7f9bb45 100644 --- a/pkg/ha/relay_test.go +++ b/pkg/ha/relay_test.go @@ -25,9 +25,9 @@ func (t *testForEtcd) TestGetRelayConfigEtcd(c *C) { var ( worker = "dm-worker-1" source = "mysql-replica-1" - cfg config.SourceConfig ) - c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil) + cfg, err := config.LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) cfg.SourceID = source // no relay source and config cfg1, rev1, err := GetRelayConfig(etcdTestCli, worker) diff --git a/pkg/ha/source.go b/pkg/ha/source.go index de0d7c661f..bb9cc49684 100644 --- a/pkg/ha/source.go +++ b/pkg/ha/source.go @@ -27,7 +27,7 @@ import ( // PutSourceCfg puts the config of the upstream source into etcd. // k/v: sourceID -> source config. -func PutSourceCfg(cli *clientv3.Client, cfg config.SourceConfig) (int64, error) { +func PutSourceCfg(cli *clientv3.Client, cfg *config.SourceConfig) (int64, error) { value, err := cfg.Toml() if err != nil { return 0, err @@ -42,12 +42,12 @@ func PutSourceCfg(cli *clientv3.Client, cfg config.SourceConfig) (int64, error) // if the source config for the sourceID not exist, return with `err == nil`. // if the source name is "", it will return all source configs as a map{sourceID: config}. // if the source name is given, it will return a map{sourceID: config} whose length is 1. -func GetSourceCfg(cli *clientv3.Client, source string, rev int64) (map[string]config.SourceConfig, int64, error) { +func GetSourceCfg(cli *clientv3.Client, source string, rev int64) (map[string]*config.SourceConfig, int64, error) { ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout) defer cancel() var ( - scm = make(map[string]config.SourceConfig) + scm = make(map[string]*config.SourceConfig) resp *clientv3.GetResponse err error ) @@ -77,8 +77,8 @@ func deleteSourceCfgOp(source string) clientv3.Op { return clientv3.OpDelete(common.UpstreamConfigKeyAdapter.Encode(source)) } -func sourceCfgFromResp(source string, resp *clientv3.GetResponse) (map[string]config.SourceConfig, error) { - scm := make(map[string]config.SourceConfig) +func sourceCfgFromResp(source string, resp *clientv3.GetResponse) (map[string]*config.SourceConfig, error) { + scm := make(map[string]*config.SourceConfig) if resp.Count == 0 { return scm, nil } else if source != "" && resp.Count > 1 { @@ -92,7 +92,7 @@ func sourceCfgFromResp(source string, resp *clientv3.GetResponse) (map[string]co if err != nil { return scm, terror.ErrConfigEtcdParse.Delegate(err, kv.Key) } - scm[cfg.SourceID] = cfg + scm[cfg.SourceID] = &cfg } return scm, nil } diff --git a/pkg/ha/source_test.go b/pkg/ha/source_test.go index af7568c1e3..44d4da4704 100644 --- a/pkg/ha/source_test.go +++ b/pkg/ha/source_test.go @@ -52,9 +52,8 @@ var _ = Suite(&testForEtcd{}) func (t *testForEtcd) TestSourceEtcd(c *C) { defer clearTestInfoOperation(c) - var cfg config.SourceConfig - - c.Assert(cfg.LoadFromFile(sourceSampleFile), IsNil) + cfg, err := config.LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) source := cfg.SourceID cfgExtra := cfg cfgExtra.SourceID = "mysql-replica-2"