Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

scheduler: check invalid source before apply last bound (#1683) #1713

Merged
19 changes: 13 additions & 6 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,7 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) {
return 0, err
}

scm := s.sourceCfgs
boundsToTrigger := make([]ha.SourceBound, 0)
// 4. recover DM-worker info and status.
for name, info := range wim {
Expand All @@ -1253,12 +1254,17 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) {
w.ToFree()
// set the stage as Bound and record the bound relationship if exists.
if bound, ok := sbm[name]; ok {
boundsToTrigger = append(boundsToTrigger, bound)
err2 = s.updateStatusForBound(w, bound)
if err2 != nil {
return 0, err2
// source bounds without source configuration should be deleted later
if _, ok := scm[bound.Source]; ok {
boundsToTrigger = append(boundsToTrigger, bound)
err2 = s.updateStatusForBound(w, bound)
if err2 != nil {
return 0, err2
}
delete(sbm, name)
} else {
s.logger.Warn("find source bound without config", zap.Stringer("bound", bound))
}
delete(sbm, name)
}
}
}
Expand Down Expand Up @@ -1581,7 +1587,8 @@ func (s *Scheduler) tryBoundForWorker(w *Worker) (bounded bool, err error) {

// tryBoundForSource tries to bound a source to a random Free worker.
// returns (true, nil) after bounded.
// called should update the s.unbounds.
// caller should update the s.unbounds.
// caller should make sure this source has source config.
func (s *Scheduler) tryBoundForSource(source string) (bool, error) {
var worker *Worker
relayWorkers := s.relayWorkers[source]
Expand Down
105 changes: 105 additions & 0 deletions dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,8 @@ func (t *testScheduler) TestLastBound(c *C) {
s.workers[workerName2] = worker2
s.workers[workerName3] = worker3
s.workers[workerName4] = worker4
s.sourceCfgs[sourceID1] = sourceCfg1
s.sourceCfgs[sourceID2] = sourceCfg2

s.lastBound[workerName1] = ha.SourceBound{Source: sourceID1}
s.lastBound[workerName2] = ha.SourceBound{Source: sourceID2}
Expand Down Expand Up @@ -1024,6 +1026,40 @@ func (t *testScheduler) TestLastBound(c *C) {
c.Assert(s.bounds[sourceID2], DeepEquals, worker2)
}

func (t *testScheduler) TestInvalidLastBound(c *C) {
defer clearTestInfoOperation(c)

var (
logger = log.L()
s = NewScheduler(&logger, config.Security{})
sourceID1 = "mysql-replica-1"
sourceID2 = "invalid-replica-1"
workerName1 = "dm-worker-1"
)

sourceCfg1, err := config.LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
sourceCfg1.SourceID = sourceID1
sourceCfg2 := sourceCfg1
sourceCfg2.SourceID = sourceID2
worker1 := &Worker{baseInfo: ha.WorkerInfo{Name: workerName1}}

// step 1: start an empty scheduler without listening the worker event
s.started = true
s.etcdCli = etcdTestCli
s.workers[workerName1] = worker1
// sourceID2 doesn't have a source config and not in unbound
s.sourceCfgs[sourceID1] = sourceCfg1
s.lastBound[workerName1] = ha.SourceBound{Source: sourceID2}
s.unbounds[sourceID1] = struct{}{}
// step2: worker1 doesn't go to last bounded source, because last source doesn't have a source config (might be removed)
worker1.ToFree()
bounded, err := s.tryBoundForWorker(worker1)
c.Assert(err, IsNil)
c.Assert(bounded, IsTrue)
c.Assert(s.bounds[sourceID1], DeepEquals, worker1)
}

func (t *testScheduler) TestTransferSource(c *C) {
defer clearTestInfoOperation(c)

Expand Down Expand Up @@ -1267,3 +1303,72 @@ func (t *testScheduler) TestCloseAllWorkers(c *C) {
c.Assert(s.workers, HasLen, 3)
checkAllWorkersClosed(c, s, true)
}

func (t *testScheduler) TestStartSourcesWithoutSourceConfigsInEtcd(c *C) {
defer clearTestInfoOperation(c)

var (
logger = log.L()
s = NewScheduler(&logger, config.Security{})
sourceID1 = "mysql-replica-1"
sourceID2 = "mysql-replica-2"
workerName1 = "dm-worker-1"
workerName2 = "dm-worker-2"
workerAddr1 = "127.0.0.1:28362"
workerAddr2 = "127.0.0.1:28363"
wg sync.WaitGroup
keepaliveTTL = int64(60)
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.started = true
s.etcdCli = etcdTestCli
// found source configs before bound
s.sourceCfgs[sourceID1] = &config.SourceConfig{}
s.sourceCfgs[sourceID2] = &config.SourceConfig{}
s.unbounds[sourceID1] = struct{}{}
s.unbounds[sourceID2] = struct{}{}
c.Assert(s.AddWorker(workerName1, workerAddr1), IsNil)
c.Assert(s.AddWorker(workerName2, workerAddr2), IsNil)

wg.Add(2)
go func() {
c.Assert(ha.KeepAlive(ctx, etcdTestCli, workerName1, keepaliveTTL), IsNil)
wg.Done()
}()
go func() {
c.Assert(ha.KeepAlive(ctx, etcdTestCli, workerName2, keepaliveTTL), IsNil)
wg.Done()
}()

s.workers[workerName1].stage = WorkerFree
s.workers[workerName2].stage = WorkerFree
bounded, err := s.tryBoundForSource(sourceID1)
c.Assert(err, IsNil)
c.Assert(bounded, IsTrue)
bounded, err = s.tryBoundForSource(sourceID2)
c.Assert(err, IsNil)
c.Assert(bounded, IsTrue)

s.started = false
sbm, _, err := ha.GetSourceBound(etcdTestCli, "")
c.Assert(err, IsNil)
c.Assert(sbm, HasLen, 2)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
kam, _, err2 := ha.GetKeepAliveWorkers(etcdTestCli)
if err2 != nil {
return false
}
return len(kam) == 2
}), IsTrue)
// there isn't any source config in etcd
c.Assert(s.Start(ctx, etcdTestCli), IsNil)
c.Assert(s.bounds, HasLen, 0)
sbm, _, err = ha.GetSourceBound(etcdTestCli, "")
c.Assert(err, IsNil)
c.Assert(sbm, HasLen, 0)
cancel()
wg.Wait()
}