From bcd1ba7912bfc6d2ec5d7f226b7ed7378e154bc3 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 12 May 2021 19:20:12 +0800 Subject: [PATCH 01/11] check invalid source before apply last bound --- dm/master/scheduler/scheduler.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index 7c66cc4fca..aa6828a56c 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -1497,7 +1497,7 @@ func (s *Scheduler) handleWorkerOffline(ev ha.WorkerEvent, toLock bool) error { bounded, err := s.tryBoundForSource(bound.Source) if err != nil { return err - } else if !bounded { + } else if _, ok := s.sourceCfgs[bound.Source]; ok && !bounded { // 8. record the source as unbounded. s.unbounds[bound.Source] = struct{}{} } @@ -1516,6 +1516,8 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { source := s.lastBound[w.baseInfo.Name].Source if _, ok := s.unbounds[source]; !ok { source = "" + } else if _, ok := s.sourceCfgs[source]; !ok { + source = "" } // try to find its relay source (currently only one relay source) From c362b337cd6e31611c1b20386c74a3df0576e571 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 13 May 2021 12:06:21 +0800 Subject: [PATCH 02/11] fix bug --- dm/master/scheduler/scheduler.go | 22 ++++++++++++++++------ dm/master/scheduler/scheduler_test.go | 10 +++++++++- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index aa6828a56c..f891a682bb 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -1497,7 +1497,7 @@ func (s *Scheduler) handleWorkerOffline(ev ha.WorkerEvent, toLock bool) error { bounded, err := s.tryBoundForSource(bound.Source) if err != nil { return err - } else if _, ok := s.sourceCfgs[bound.Source]; ok && !bounded { + } else if !bounded { // 8. record the source as unbounded. s.unbounds[bound.Source] = struct{}{} } @@ -1550,11 +1550,14 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { // randomly pick one from unbounds if source == "" { - for source = range s.unbounds { - s.logger.Info("found unbound source when worker bound", - zap.String("worker", w.BaseInfo().Name), - zap.String("source", source)) - break // got a source. + for unboundedSource := range s.unbounds { + if _, ok := s.sourceCfgs[unboundedSource]; ok { + source = unboundedSource + s.logger.Info("found unbound source when worker bound", + zap.String("worker", w.BaseInfo().Name), + zap.String("source", source)) + break // got a source. + } } } @@ -1586,6 +1589,13 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { // called should update the s.unbounds. func (s *Scheduler) tryBoundForSource(source string) (bool, error) { var worker *Worker + // 0. check whether this source has source configuration. If not, we shouldn't bound this but just abandon it + // This usually happens when dm didn't handle the source compatibility correctly. For example, downgrade from a higher dm version + // We don't delete this source directly here, because we may bound it again after we fix the compatibility problem + if _, ok := s.sourceCfgs[source]; !ok { + s.logger.Warn("source configuration not found for the source to bound, add this source to unbounded", zap.String("source", source)) + return false, nil + } relayWorkers := s.relayWorkers[source] // 1. try to find a history worker in relay workers... if len(relayWorkers) > 0 { diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index 75581797d4..632fbe2a12 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -987,10 +987,18 @@ func (t *testScheduler) TestLastBound(c *C) { s.unbounds[sourceID1] = struct{}{} s.unbounds[sourceID2] = struct{}{} - // worker1 goes to last bounded source + // without source configurations, worker1.ToFree() bounded, err := s.tryBoundForWorker(worker1) c.Assert(err, IsNil) + c.Assert(bounded, IsFalse) + c.Assert(s.bounds, HasLen, 0) + + // worker1 goes to last bounded source + s.sourceCfgs[sourceID1] = sourceCfg1 + s.sourceCfgs[sourceID2] = sourceCfg2 + bounded, err = s.tryBoundForWorker(worker1) + c.Assert(err, IsNil) c.Assert(bounded, IsTrue) c.Assert(s.bounds[sourceID1], DeepEquals, worker1) From 70e971db65981b0fad1109a0e4d36cc293385177 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 13 May 2021 12:07:54 +0800 Subject: [PATCH 03/11] fix comment --- dm/master/scheduler/scheduler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index 632fbe2a12..f440693241 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -987,7 +987,7 @@ func (t *testScheduler) TestLastBound(c *C) { s.unbounds[sourceID1] = struct{}{} s.unbounds[sourceID2] = struct{}{} - // without source configurations, + // without source configuration, we don't bind this source here worker1.ToFree() bounded, err := s.tryBoundForWorker(worker1) c.Assert(err, IsNil) From ea0eb4832543bebc04f76d2154edc3eb91c00bf2 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 13 May 2021 19:49:11 +0800 Subject: [PATCH 04/11] fix --- dm/master/scheduler/scheduler.go | 18 +++++++++++------- dm/master/scheduler/scheduler_test.go | 5 +++++ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index f891a682bb..63cb907a47 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -1240,6 +1240,7 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) { return 0, err } + scm := s.sourceCfgs boundsToTrigger := make([]ha.SourceBound, 0) // 4. recover DM-worker info and status. for name, info := range wim { @@ -1253,12 +1254,15 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) { w.ToFree() // set the stage as Bound and record the bound relationship if exists. if bound, ok := sbm[name]; ok { - boundsToTrigger = append(boundsToTrigger, bound) - err2 = s.updateStatusForBound(w, bound) - if err2 != nil { - return 0, err2 + // source bounds without source configuration should be deleted later + if _, ok := scm[bound.Source]; ok { + boundsToTrigger = append(boundsToTrigger, bound) + err2 = s.updateStatusForBound(w, bound) + if err2 != nil { + return 0, err2 + } + delete(sbm, name) } - delete(sbm, name) } } } @@ -1591,10 +1595,10 @@ func (s *Scheduler) tryBoundForSource(source string) (bool, error) { var worker *Worker // 0. check whether this source has source configuration. If not, we shouldn't bound this but just abandon it // This usually happens when dm didn't handle the source compatibility correctly. For example, downgrade from a higher dm version - // We don't delete this source directly here, because we may bound it again after we fix the compatibility problem + // We can just mark this source bounded to abandon this source. After we fix the compatibility problem dm-master will restart the scheduler or manually add this source if _, ok := s.sourceCfgs[source]; !ok { s.logger.Warn("source configuration not found for the source to bound, add this source to unbounded", zap.String("source", source)) - return false, nil + return true, nil } relayWorkers := s.relayWorkers[source] // 1. try to find a history worker in relay workers... diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index f440693241..febdef7e6e 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -993,6 +993,11 @@ func (t *testScheduler) TestLastBound(c *C) { c.Assert(err, IsNil) c.Assert(bounded, IsFalse) c.Assert(s.bounds, HasLen, 0) + // without source configuration, we return bounded true here, but no bound is added + bounded, err = s.tryBoundForSource(sourceID1) + c.Assert(err, IsNil) + c.Assert(bounded, IsTrue) + c.Assert(s.bounds, HasLen, 0) // worker1 goes to last bounded source s.sourceCfgs[sourceID1] = sourceCfg1 From da828698ea9b30c488db98a4ca35f676c9179529 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 13 May 2021 21:23:00 +0800 Subject: [PATCH 05/11] add ut --- dm/master/scheduler/scheduler.go | 2 + dm/master/scheduler/scheduler_test.go | 66 +++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index 63cb907a47..56ec69a26e 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -1262,6 +1262,8 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) { return 0, err2 } delete(sbm, name) + } else { + s.logger.Warn("find source bound without config", zap.Stringer("bound", bound)) } } } diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index febdef7e6e..c1bc14602d 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -1280,3 +1280,69 @@ func (t *testScheduler) TestCloseAllWorkers(c *C) { c.Assert(s.workers, HasLen, 3) checkAllWorkersClosed(c, s, true) } + +func (t *testScheduler) TestStartSourcesWithoutSourceBounds(c *C) { + defer clearTestInfoOperation(c) + + var ( + logger = log.L() + s = NewScheduler(&logger, config.Security{}) + sourceID1 = "mysql-replica-1" + sourceID2 = "mysql-replica-2" + workerName1 = "dm-worker-1" + workerName2 = "dm-worker-2" + workerAddr1 = "127.0.0.1:28362" + workerAddr2 = "127.0.0.1:28363" + wg sync.WaitGroup + keepaliveTTL = int64(60) + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + s.started = true + s.etcdCli = etcdTestCli + s.sourceCfgs[sourceID1] = config.SourceConfig{} + s.sourceCfgs[sourceID2] = config.SourceConfig{} + s.unbounds[sourceID1] = struct{}{} + s.unbounds[sourceID2] = struct{}{} + c.Assert(s.AddWorker(workerName1, workerAddr1), IsNil) + c.Assert(s.AddWorker(workerName2, workerAddr2), IsNil) + + wg.Add(2) + go func() { + c.Assert(ha.KeepAlive(ctx, etcdTestCli, workerName1, keepaliveTTL), IsNil) + wg.Done() + }() + go func() { + c.Assert(ha.KeepAlive(ctx, etcdTestCli, workerName2, keepaliveTTL), IsNil) + wg.Done() + }() + + s.workers[workerName1].stage = WorkerFree + s.workers[workerName2].stage = WorkerFree + bounded, err := s.tryBoundForSource(sourceID1) + c.Assert(err, IsNil) + c.Assert(bounded, IsTrue) + bounded, err = s.tryBoundForSource(sourceID2) + c.Assert(err, IsNil) + c.Assert(bounded, IsTrue) + + s.started = false + sbm, _, err := ha.GetSourceBound(etcdTestCli, "") + c.Assert(err, IsNil) + c.Assert(sbm, HasLen, 2) + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + kam, _, err := ha.GetKeepAliveWorkers(etcdTestCli) + if err != nil { + return false + } + return len(kam) == 2 + }), IsTrue) + + c.Assert(s.Start(ctx, etcdTestCli), IsNil) + c.Assert(s.bounds, HasLen, 0) + sbm, _, err = ha.GetSourceBound(etcdTestCli, "") + c.Assert(err, IsNil) + c.Assert(sbm, HasLen, 0) +} From 118db89e64d963bf8ff5981f08fdd56bb5f28af2 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 14 May 2021 11:32:42 +0800 Subject: [PATCH 06/11] fix lint --- dm/master/scheduler/scheduler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index c1bc14602d..0cd3c4a477 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -1333,8 +1333,8 @@ func (t *testScheduler) TestStartSourcesWithoutSourceBounds(c *C) { c.Assert(err, IsNil) c.Assert(sbm, HasLen, 2) c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - kam, _, err := ha.GetKeepAliveWorkers(etcdTestCli) - if err != nil { + kam, _, err2 := ha.GetKeepAliveWorkers(etcdTestCli) + if err2 != nil { return false } return len(kam) == 2 From b87d6a1bcd6628823b30b509a940f3165391dafe Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 14 May 2021 11:47:15 +0800 Subject: [PATCH 07/11] fix lint --- dm/master/scheduler/scheduler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index 0cd3c4a477..1dda1eadb3 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -1302,8 +1302,8 @@ func (t *testScheduler) TestStartSourcesWithoutSourceBounds(c *C) { s.started = true s.etcdCli = etcdTestCli - s.sourceCfgs[sourceID1] = config.SourceConfig{} - s.sourceCfgs[sourceID2] = config.SourceConfig{} + s.sourceCfgs[sourceID1] = &config.SourceConfig{} + s.sourceCfgs[sourceID2] = &config.SourceConfig{} s.unbounds[sourceID1] = struct{}{} s.unbounds[sourceID2] = struct{}{} c.Assert(s.AddWorker(workerName1, workerAddr1), IsNil) From 659d930af178d609080629b2c208008916d12c77 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 18 May 2021 16:27:55 +0800 Subject: [PATCH 08/11] address comment --- dm/master/scheduler/scheduler.go | 15 ++----- dm/master/scheduler/scheduler_test.go | 56 +++++++++++++++++++-------- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index 56ec69a26e..8250bd1459 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -1556,9 +1556,8 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { // randomly pick one from unbounds if source == "" { - for unboundedSource := range s.unbounds { - if _, ok := s.sourceCfgs[unboundedSource]; ok { - source = unboundedSource + for source = range s.unbounds { + if _, ok := s.sourceCfgs[source]; ok { s.logger.Info("found unbound source when worker bound", zap.String("worker", w.BaseInfo().Name), zap.String("source", source)) @@ -1592,16 +1591,10 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { // tryBoundForSource tries to bound a source to a random Free worker. // returns (true, nil) after bounded. -// called should update the s.unbounds. +// caller should update the s.unbounds. +// caller should make sure this source has source config tryBoundForSource func (s *Scheduler) tryBoundForSource(source string) (bool, error) { var worker *Worker - // 0. check whether this source has source configuration. If not, we shouldn't bound this but just abandon it - // This usually happens when dm didn't handle the source compatibility correctly. For example, downgrade from a higher dm version - // We can just mark this source bounded to abandon this source. After we fix the compatibility problem dm-master will restart the scheduler or manually add this source - if _, ok := s.sourceCfgs[source]; !ok { - s.logger.Warn("source configuration not found for the source to bound, add this source to unbounded", zap.String("source", source)) - return true, nil - } relayWorkers := s.relayWorkers[source] // 1. try to find a history worker in relay workers... if len(relayWorkers) > 0 { diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index 1dda1eadb3..589d31af65 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -981,29 +981,18 @@ func (t *testScheduler) TestLastBound(c *C) { s.workers[workerName2] = worker2 s.workers[workerName3] = worker3 s.workers[workerName4] = worker4 + s.sourceCfgs[sourceID1] = sourceCfg1 + s.sourceCfgs[sourceID2] = sourceCfg2 s.lastBound[workerName1] = ha.SourceBound{Source: sourceID1} s.lastBound[workerName2] = ha.SourceBound{Source: sourceID2} s.unbounds[sourceID1] = struct{}{} s.unbounds[sourceID2] = struct{}{} - // without source configuration, we don't bind this source here + // worker1 goes to last bounded source worker1.ToFree() bounded, err := s.tryBoundForWorker(worker1) c.Assert(err, IsNil) - c.Assert(bounded, IsFalse) - c.Assert(s.bounds, HasLen, 0) - // without source configuration, we return bounded true here, but no bound is added - bounded, err = s.tryBoundForSource(sourceID1) - c.Assert(err, IsNil) - c.Assert(bounded, IsTrue) - c.Assert(s.bounds, HasLen, 0) - - // worker1 goes to last bounded source - s.sourceCfgs[sourceID1] = sourceCfg1 - s.sourceCfgs[sourceID2] = sourceCfg2 - bounded, err = s.tryBoundForWorker(worker1) - c.Assert(err, IsNil) c.Assert(bounded, IsTrue) c.Assert(s.bounds[sourceID1], DeepEquals, worker1) @@ -1037,6 +1026,40 @@ func (t *testScheduler) TestLastBound(c *C) { c.Assert(s.bounds[sourceID2], DeepEquals, worker2) } +func (t *testScheduler) TestInvalidLastBound(c *C) { + defer clearTestInfoOperation(c) + + var ( + logger = log.L() + s = NewScheduler(&logger, config.Security{}) + sourceID1 = "mysql-replica-1" + sourceID2 = "invalid-replica-1" + workerName1 = "dm-worker-1" + ) + + sourceCfg1, err := config.LoadFromFile(sourceSampleFile) + c.Assert(err, IsNil) + sourceCfg1.SourceID = sourceID1 + sourceCfg2 := sourceCfg1 + sourceCfg2.SourceID = sourceID2 + worker1 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName1}} + + // step 1: start an empty scheduler without listening the worker event + s.started = true + s.etcdCli = etcdTestCli + s.workers[workerName1] = worker1 + // sourceID2 doesn't have a source config and not in unbound + s.sourceCfgs[sourceID1] = sourceCfg1 + s.lastBound[workerName1] = ha.SourceBound{Source: sourceID2} + s.unbounds[sourceID1] = struct{}{} + // step2: worker1 doesn't go to last bounded source, because last source doesn't have a source config (might be removed) + worker1.ToFree() + bounded, err := s.tryBoundForWorker(worker1) + c.Assert(err, IsNil) + c.Assert(bounded, IsTrue) + c.Assert(s.bounds[sourceID1], DeepEquals, worker1) +} + func (t *testScheduler) TestTransferSource(c *C) { defer clearTestInfoOperation(c) @@ -1281,7 +1304,7 @@ func (t *testScheduler) TestCloseAllWorkers(c *C) { checkAllWorkersClosed(c, s, true) } -func (t *testScheduler) TestStartSourcesWithoutSourceBounds(c *C) { +func (t *testScheduler) TestStartSourcesWithoutSourceConfigsInEtcd(c *C) { defer clearTestInfoOperation(c) var ( @@ -1302,6 +1325,7 @@ func (t *testScheduler) TestStartSourcesWithoutSourceBounds(c *C) { s.started = true s.etcdCli = etcdTestCli + // found source configs before bound s.sourceCfgs[sourceID1] = &config.SourceConfig{} s.sourceCfgs[sourceID2] = &config.SourceConfig{} s.unbounds[sourceID1] = struct{}{} @@ -1339,7 +1363,7 @@ func (t *testScheduler) TestStartSourcesWithoutSourceBounds(c *C) { } return len(kam) == 2 }), IsTrue) - + // there isn't any source config in etcd c.Assert(s.Start(ctx, etcdTestCli), IsNil) c.Assert(s.bounds, HasLen, 0) sbm, _, err = ha.GetSourceBound(etcdTestCli, "") From 55cc313c464cbacf3fb8d181c8b2767108d18180 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 18 May 2021 19:26:34 +0800 Subject: [PATCH 09/11] address comment --- dm/master/scheduler/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index 8250bd1459..ff5d3a3576 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -1592,7 +1592,7 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { // tryBoundForSource tries to bound a source to a random Free worker. // returns (true, nil) after bounded. // caller should update the s.unbounds. -// caller should make sure this source has source config tryBoundForSource +// caller should make sure this source has source config. func (s *Scheduler) tryBoundForSource(source string) (bool, error) { var worker *Worker relayWorkers := s.relayWorkers[source] From 191b3662372447d704903ab6858279363594e0a2 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 18 May 2021 20:39:33 +0800 Subject: [PATCH 10/11] fix --- dm/master/scheduler/scheduler.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/dm/master/scheduler/scheduler.go b/dm/master/scheduler/scheduler.go index ff5d3a3576..046e12e0f3 100644 --- a/dm/master/scheduler/scheduler.go +++ b/dm/master/scheduler/scheduler.go @@ -1522,8 +1522,6 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { source := s.lastBound[w.baseInfo.Name].Source if _, ok := s.unbounds[source]; !ok { source = "" - } else if _, ok := s.sourceCfgs[source]; !ok { - source = "" } // try to find its relay source (currently only one relay source) @@ -1557,12 +1555,10 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) { // randomly pick one from unbounds if source == "" { for source = range s.unbounds { - if _, ok := s.sourceCfgs[source]; ok { - s.logger.Info("found unbound source when worker bound", - zap.String("worker", w.BaseInfo().Name), - zap.String("source", source)) - break // got a source. - } + s.logger.Info("found unbound source when worker bound", + zap.String("worker", w.BaseInfo().Name), + zap.String("source", source)) + break // got a source. } } From 5da2e43feabf4b7284d759814f4dbe4ca929accd Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Wed, 19 May 2021 12:36:49 +0800 Subject: [PATCH 11/11] fix test --- dm/master/scheduler/scheduler_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dm/master/scheduler/scheduler_test.go b/dm/master/scheduler/scheduler_test.go index 589d31af65..6821f683b8 100644 --- a/dm/master/scheduler/scheduler_test.go +++ b/dm/master/scheduler/scheduler_test.go @@ -1369,4 +1369,6 @@ func (t *testScheduler) TestStartSourcesWithoutSourceConfigsInEtcd(c *C) { sbm, _, err = ha.GetSourceBound(etcdTestCli, "") c.Assert(err, IsNil) c.Assert(sbm, HasLen, 0) + cancel() + wg.Wait() }