Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

master/worker(dm): suppport update config when task not run #4760

Merged
merged 29 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
dbe67dc
save work
Ehco1996 Mar 3, 2022
587efbb
address comment
Ehco1996 Mar 3, 2022
ff8bcc6
add more ut
Ehco1996 Mar 3, 2022
0be36f6
check relay worekrs
Ehco1996 Mar 4, 2022
f13457b
worker side check
Ehco1996 Mar 4, 2022
7cd1276
revcert about source enable
Ehco1996 Mar 4, 2022
a24ba25
just log error when update failed
Ehco1996 Mar 4, 2022
7c446a7
fix an test
Ehco1996 Mar 4, 2022
88c8b65
fix an panic when unit is nil
Ehco1996 Mar 4, 2022
6b90fd1
fix ut with mock unit
Ehco1996 Mar 4, 2022
c18db33
fix nil check
Ehco1996 Mar 4, 2022
64d59b1
fix comment
Ehco1996 Mar 7, 2022
7379798
address comment
Ehco1996 Mar 8, 2022
7864eee
fix a test
Ehco1996 Mar 8, 2022
e1a2d3e
fix CheckCanUpdateCfg
Ehco1996 Mar 8, 2022
7aa818a
address comment
Ehco1996 Mar 9, 2022
7749eac
revert GetRelayWorkers
Ehco1996 Mar 9, 2022
dd8c699
Merge branch 'master' into feature-#4484-update-cfg
Ehco1996 Mar 9, 2022
1e0082c
address comment
Ehco1996 Mar 9, 2022
6f1d1b2
fix ut
Ehco1996 Mar 9, 2022
385d49b
Merge branch 'master' into feature-#4484-update-cfg
Ehco1996 Mar 10, 2022
f80b6b9
Apply suggestions from code review
Ehco1996 Mar 10, 2022
3965843
address comment
Ehco1996 Mar 10, 2022
e2aa469
copy code from another branch
Ehco1996 Mar 10, 2022
2aeb520
revert test func move
Ehco1996 Mar 10, 2022
6074849
Merge branch 'master' into feature-#4484-update-cfg
Ehco1996 Mar 10, 2022
889cd62
Merge branch 'master' into feature-#4484-update-cfg
ti-chi-bot Mar 10, 2022
522800a
Merge branch 'master' into feature-#4484-update-cfg
ti-chi-bot Mar 10, 2022
26c5a55
Merge branch 'master' into feature-#4484-update-cfg
ti-chi-bot Mar 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high], "Messag
ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high], "Message: mysql source worker %s has already started with source %s, but get a request with source %s, Workaround: Please try restart this DM-worker"
ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high], "Message: source of request does not match with source in worker"
ErrWorkerWaitRelayCatchupGTID,[code=40078:class=dm-worker:scope=internal:level=high], "Message: cannot compare gtid between loader and relay, loader gtid: %s, relay gtid: %s"
ErrWorkerUpdateSubTaskConfig,[code=40081:class=dm-worker:scope=internal:level=high], "Message: can only update task config for limited filed and this task must in sync unit, current task: %s current unit: %s"
ErrWorkerFailToGetSubtaskConfigFromEtcd,[code=40073:class=dm-worker:scope=internal:level=medium], "Message: there is no relative subtask config for task %s in etcd"
ErrWorkerFailToGetSourceConfigFromEtcd,[code=40074:class=dm-worker:scope=internal:level=medium], "Message: there is no relative source config for source %s in etcd"
ErrWorkerDDLLockOpNotFound,[code=40075:class=dm-worker:scope=internal:level=high], "Message: missing shard DDL lock operation for shard DDL info (%s)"
Expand Down Expand Up @@ -525,17 +526,19 @@ ErrSchedulerRelayStageInvalidUpdate,[code=46011:class=scheduler:scope=internal:l
ErrSchedulerRelayStageSourceNotExist,[code=46012:class=scheduler:scope=internal:level=medium], "Message: sources %v need to update expectant relay stage not exist"
ErrSchedulerMultiTask,[code=46013:class=scheduler:scope=internal:level=medium], "Message: the scheduler cannot perform multiple different tasks %v in one operation"
ErrSchedulerSubTaskExist,[code=46014:class=scheduler:scope=internal:level=medium], "Message: subtasks with name %s for sources %v already exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerSubTaskNotExist,[code=46034:class=scheduler:scope=internal:level=medium], "Message: subtasks with name %s for sources %v not exist, Workaround: Please create this subtask first."
ErrSchedulerSubTaskStageInvalidUpdate,[code=46015:class=dm-master:scope=internal:level=medium], "Message: invalid new expectant subtask stage %s"
ErrSchedulerSubTaskOpTaskNotExist,[code=46016:class=dm-master:scope=internal:level=medium], "Message: subtasks with name %s need to be operate not exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerSubTaskOpSourceNotExist,[code=46017:class=dm-master:scope=internal:level=medium], "Message: sources %v need to be operate not exist"
ErrSchedulerTaskNotExist,[code=46018:class=scheduler:scope=internal:level=medium], "Message: task with name %s not exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerSubTaskCfgUpdate,[code=46035:class=scheduler:scope=internal:level=low], "Message: subtask with name %s source name %s can only update when no running tasks for now"
ErrSchedulerRequireRunningTaskInSyncUnit,[code=46019:class=scheduler:scope=internal:level=high], "Message: running tasks %v to be transferred on source %s should in sync unit, Workaround: Please use `pause-task [-s source ...] task` to pause them first."
ErrSchedulerRelayWorkersBusy,[code=46020:class=scheduler:scope=internal:level=high], "Message: these workers %s have started relay for sources %s respectively, Workaround: Please use `stop-relay` to stop them, or change your topology."
ErrSchedulerRelayWorkersWrongBound,[code=46021:class=scheduler:scope=internal:level=high], "Message: these workers %s have bound for another sources %s respectively, Workaround: Please `start-relay` on free or same source workers."
ErrSchedulerRelayWorkersWrongRelay,[code=46022:class=scheduler:scope=internal:level=high], "Message: these workers %s have started relay for another sources %s respectively, Workaround: Please correct sources in `stop-relay`."
ErrSchedulerSourceOpRelayExist,[code=46023:class=scheduler:scope=internal:level=high], "Message: source with name %s need to operate has existing relay workers %s, Workaround: Please `stop-relay` first."
ErrSchedulerLatchInUse,[code=46024:class=scheduler:scope=internal:level=low], "Message: when %s, resource %s is in use by other client, Workaround: Please try again later"
ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update relay-log related parts for now"
ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update when no relay workers and no running tasks for now"
ErrSchedulerWrongWorkerInput,[code=46026:class=scheduler:scope=internal:level=medium], "Message: require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]"
ErrSchedulerBoundDiffWithStartedRelay,[code=46027:class=scheduler:scope=internal:level=medium], "Message: require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s], Workaround: If you intend to bind the source with worker, you can stop-relay for current source."
ErrSchedulerStartRelayOnSpecified,[code=46028:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now, Workaround: Please stop all relay workers first, or specify worker name for `start-relay`."
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/master/openapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *Server) updateSource(ctx context.Context, cfg *config.SourceConfig) err
// nolint:unparam
func (s *Server) deleteSource(ctx context.Context, sourceName string, force bool) error {
if force {
for _, taskName := range s.scheduler.GetTaskNameListBySourceName(sourceName) {
for _, taskName := range s.scheduler.GetTaskNameListBySourceName(sourceName, nil) {
if err := s.scheduler.RemoveSubTasks(taskName, sourceName); err != nil {
return err
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (s *Server) enableSource(ctx context.Context, sourceName, workerName string
if worker == nil {
return terror.ErrWorkerNoStart
}
taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName)
taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName, nil)
return s.scheduler.BatchOperateTaskOnWorker(ctx, worker, taskNameList, sourceName, pb.Stage_Running, true)
}

Expand All @@ -119,7 +119,7 @@ func (s *Server) disableSource(ctx context.Context, sourceName string) error {
// no need to stop task if the source is not running
return nil
}
taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName)
taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName, nil)
return s.scheduler.BatchOperateTaskOnWorker(ctx, worker, taskNameList, sourceName, pb.Stage_Stopped, true)
}

Expand Down
113 changes: 92 additions & 21 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,6 @@ func (s *Scheduler) addSource(cfg *config.SourceConfig) error {
}

// UpdateSourceCfg update the upstream source config to the cluster.
// please verify the config before call this.
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
func (s *Scheduler) UpdateSourceCfg(cfg *config.SourceConfig) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -409,15 +408,20 @@ func (s *Scheduler) UpdateSourceCfg(cfg *config.SourceConfig) error {
return terror.ErrSchedulerSourceCfgNotExist.Generate(cfg.SourceID)
}
// 2. check if tasks using this configuration are running
if tasks := s.GetTaskNameListBySourceName(cfg.SourceID); len(tasks) > 0 {
return terror.ErrSchedulerSourceOpTaskExist.Generate(cfg.SourceID, tasks)
runningStage := pb.Stage_Running
if tasks := s.GetTaskNameListBySourceName(cfg.SourceID, &runningStage); len(tasks) > 0 {
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
return terror.ErrSchedulerSourceCfgUpdate.Generate(cfg.SourceID)
}
// 3. check this cfg is ok to update.
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
if !checkSourceCfgCanUpdated(s.sourceCfgs[cfg.SourceID], cfg) {
return terror.ErrSchedulerSourceCfgUpdate.Generate()
// 3. check if there is relay workers for this source
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember since v5.4, source config with enable-relay: true will not record a relay worker. Should we also allow update for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now use expectRelayStages to check if this source is enable relay, seems this can avoid this case

7aa818a

relayWorkers, err := s.getRelayWorkers(cfg.SourceID)
if err != nil {
return err
}
if len(relayWorkers) > 0 {
return terror.ErrSchedulerSourceCfgUpdate.Generate(cfg.SourceID)
}
// 4. put the config into etcd.
_, err := ha.PutSourceCfg(s.etcdCli, cfg)
_, err = ha.PutSourceCfg(s.etcdCli, cfg)
if err != nil {
return err
}
Expand All @@ -426,15 +430,6 @@ func (s *Scheduler) UpdateSourceCfg(cfg *config.SourceConfig) error {
return nil
}

// currently the source cfg can only update relay-log related parts.
func checkSourceCfgCanUpdated(oldCfg, newCfg *config.SourceConfig) bool {
newCfgClone := newCfg.Clone()
newCfgClone.RelayBinLogName = oldCfg.RelayBinLogName
newCfgClone.RelayBinlogGTID = oldCfg.RelayBinlogGTID
newCfgClone.RelayDir = oldCfg.RelayDir
return newCfgClone.String() == oldCfg.String()
}

// RemoveSourceCfg removes the upstream source config in the cluster.
// when removing the upstream source config, it should also remove:
// - any existing relay stage.
Expand Down Expand Up @@ -1039,6 +1034,73 @@ func (s *Scheduler) RemoveSubTasks(task string, sources ...string) error {
return nil
}

// UpdateSubTasks update the information of one or more subtasks for one task.
func (s *Scheduler) UpdateSubTasks(ctx context.Context, cfgs ...config.SubTaskConfig) error {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if !s.started.Load() {
return terror.ErrSchedulerNotStarted.Generate()
}
if len(cfgs) == 0 {
return nil // no subtasks need to add, this should not happen.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can change the function parameter to (context, SubTaskConfig, ...SubTaskConfig) to tell the compiler there must be at least one SubTaskConfig.

OK to not change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can't find the example of how to use unnamed paramaters , how can i refer the unnamed stcfgs?

seems the only use of unnamed parameters is when you have to define a function with a specific
https://stackoverflow.com/questions/24000305/how-to-refer-to-an-unnamed-function-argument-in-go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(context, ...SubTaskConfig) -> (context, SubTaskConfig, ...SubTaskConfig)

}
taskNamesM := make(map[string]struct{}, 1)
for _, cfg := range cfgs {
taskNamesM[cfg.Name] = struct{}{}
}
taskNames := strMapToSlice(taskNamesM)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
if len(taskNames) > 1 {
// only subtasks from one task supported now.
return terror.ErrSchedulerMultiTask.Generate(taskNames)
}
// check whether exists.
for _, cfg := range cfgs {
v, ok := s.subTaskCfgs.Load(cfg.Name)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
return terror.ErrSchedulerTaskNotExist.Generate(cfg.Name)
}
cfgM := v.(map[string]config.SubTaskConfig)
_, ok = cfgM[cfg.SourceID]
if !ok {
return terror.ErrSchedulerSubTaskNotExist.Generate(cfg.Name, cfg.SourceID)
}
}
// check whether in running stage
for _, cfg := range cfgs {
stage := s.GetExpectSubTaskStage(cfg.Name, cfg.SourceID)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
if stage.Expect == pb.Stage_Running {
return terror.ErrSchedulerSubTaskCfgUpdate.Generate(cfg.Name, cfg.SourceID)
}
}

// check by workers todo batch
for _, cfg := range cfgs {
worker := s.GetWorkerBySource(cfg.SourceID)
if worker == nil {
return terror.ErrWorkerNoStart
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
}
resp, err := worker.checkSubtasksCanUpdate(ctx, &cfg)
if err != nil {
return err
}
if !resp.CheckSubtasksCanUpdate.Success {
return terror.ErrSchedulerSubTaskCfgUpdate.Generatef("can not update because %s", resp.CheckSubtasksCanUpdate.Msg)
}
}
s.mu.Lock()
defer s.mu.Unlock()
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
// put the configs and stages into etcd.
_, err := ha.PutSubTaskCfgStage(s.etcdCli, cfgs, []ha.Stage{}, []ha.Stage{})
if err != nil {
return err
}
// record the config
for _, cfg := range cfgs {
v, _ := s.subTaskCfgs.LoadOrStore(cfg.Name, map[string]config.SubTaskConfig{})
m := v.(map[string]config.SubTaskConfig)
m[cfg.SourceID] = cfg
}
return nil
}

// getSubTaskCfgByTaskSource gets subtask config by task name and source ID. Only used in tests.
func (s *Scheduler) getSubTaskCfgByTaskSource(task, source string) *config.SubTaskConfig {
v, ok := s.subTaskCfgs.Load(task)
Expand Down Expand Up @@ -1144,12 +1206,18 @@ func (s *Scheduler) GetSubTaskCfgs() map[string]map[string]config.SubTaskConfig
}

// GetTaskNameListBySourceName gets task name list by source name.
func (s *Scheduler) GetTaskNameListBySourceName(sourceName string) []string {
func (s *Scheduler) GetTaskNameListBySourceName(sourceName string, expectStage *pb.Stage) []string {
var taskNameList []string
s.subTaskCfgs.Range(func(k, v interface{}) bool {
s.expectSubTaskStages.Range(func(k, v interface{}) bool {
subtaskM := v.(map[string]ha.Stage)
subtaskStage, ok2 := subtaskM[sourceName]
if !ok2 {
return true
}
task := k.(string)
subtaskCfgMap := v.(map[string]config.SubTaskConfig)
if _, ok := subtaskCfgMap[sourceName]; ok {
if expectStage == nil {
taskNameList = append(taskNameList, task)
} else if subtaskStage.Expect == *expectStage {
taskNameList = append(taskNameList, task)
}
return true
Expand Down Expand Up @@ -1490,11 +1558,14 @@ func (s *Scheduler) StopRelay(source string, workers []string) error {
func (s *Scheduler) GetRelayWorkers(source string) ([]*Worker, error) {
s.mu.RLock()
defer s.mu.RUnlock()

if !s.started.Load() {
return nil, terror.ErrSchedulerNotStarted.Generate()
}
return s.getRelayWorkers(source)
}

// getRelayWorkers get relay workers with no lock and check.
func (s *Scheduler) getRelayWorkers(source string) ([]*Worker, error) {
workers := s.relayWorkers[source]
ret := make([]*Worker, 0, len(workers))
for w := range workers {
Expand Down
105 changes: 97 additions & 8 deletions dm/dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,103 @@ var _ = Suite(&testScheduler{})

var stageEmpty ha.Stage

func (t *testScheduler) TestUpdateSubTasksAndSourceCfg(c *C) {
defer clearTestInfoOperation(c)

var (
logger = log.L()
s = NewScheduler(&logger, config.Security{})
sourceID1 = "mysql-replica-1"
taskName1 = "task-1"
workerName1 = "dm-worker-1"
workerAddr1 = "127.0.0.1:8262"
subtaskCfg1 config.SubTaskConfig
keepAliveTTL = int64(5)
ctx = context.Background()
)
sourceCfg1, err := config.LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
sourceCfg1.SourceID = sourceID1
c.Assert(subtaskCfg1.DecodeFile(subTaskSampleFile, true), IsNil)
subtaskCfg1.SourceID = sourceID1
subtaskCfg1.Name = taskName1
c.Assert(subtaskCfg1.Adjust(true), IsNil)

// not started scheduler can't update
c.Assert(terror.ErrSchedulerNotStarted.Equal(s.UpdateSubTasks(ctx, subtaskCfg1)), IsTrue)
c.Assert(terror.ErrSchedulerNotStarted.Equal(s.UpdateSourceCfg(sourceCfg1)), IsTrue)

// start the scheduler
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c.Assert(s.Start(ctx, etcdTestCli), IsNil)

// can't update source when source not added
c.Assert(terror.ErrSchedulerSourceCfgNotExist.Equal(s.UpdateSourceCfg(sourceCfg1)), IsTrue)

subtaskCfg2 := subtaskCfg1
subtaskCfg2.Name = "fake name"
// can't update subtask with different task name
c.Assert(terror.ErrSchedulerMultiTask.Equal(s.UpdateSubTasks(ctx, subtaskCfg1, subtaskCfg2)), IsTrue)

// can't update not added subtask
c.Assert(terror.ErrSchedulerTaskNotExist.Equal(s.UpdateSubTasks(ctx, subtaskCfg1)), IsTrue)

// start worker,add source and subtask
c.Assert(s.AddSourceCfg(sourceCfg1), IsNil)
ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
c.Assert(s.AddWorker(workerName1, workerAddr1), IsNil)
go func() {
c.Assert(ha.KeepAlive(ctx1, etcdTestCli, workerName1, keepAliveTTL), IsNil)
}()
// wait for source1 bound to worker1.
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
bounds := s.BoundSources()
return len(bounds) == 1 && bounds[0] == sourceID1
}), IsTrue)
c.Assert(s.AddSubTasks(false, pb.Stage_Running, subtaskCfg1), IsNil)

// can't update subtask not in scheduler
subtaskCfg2.Name = subtaskCfg1.Name
subtaskCfg2.SourceID = "fake source name"
c.Assert(terror.ErrSchedulerSubTaskNotExist.Equal(s.UpdateSubTasks(ctx, subtaskCfg2)), IsTrue)

// can't update subtask in running stage
c.Assert(terror.ErrSchedulerSubTaskCfgUpdate.Equal(s.UpdateSubTasks(ctx, subtaskCfg1)), IsTrue)
// can't update source when there is running tasks
c.Assert(terror.ErrSchedulerSourceCfgUpdate.Equal(s.UpdateSourceCfg(sourceCfg1)), IsTrue)

// pause task
c.Assert(s.UpdateExpectSubTaskStage(pb.Stage_Paused, taskName1, sourceID1), IsNil)

// can't update source when there is a relay worker for this source
c.Assert(s.StartRelay(sourceID1, []string{workerName1}), IsNil)
c.Assert(terror.ErrSchedulerSourceCfgUpdate.Equal(s.UpdateSourceCfg(sourceCfg1)), IsTrue)
c.Assert(s.StopRelay(sourceID1, []string{workerName1}), IsNil)

// cant't updated when worker rpc error
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/scheduler/operateCheckSubtasksCanUpdate", `return("error")`), IsNil)
c.Assert(s.UpdateSubTasks(ctx, subtaskCfg1), ErrorMatches, "query error")
c.Assert(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/scheduler/operateCheckSubtasksCanUpdate"), IsNil)

// cant't updated when worker rpc check not pass
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/scheduler/operateCheckSubtasksCanUpdate", `return("failed")`), IsNil)
c.Assert(terror.ErrSchedulerSubTaskCfgUpdate.Equal(s.UpdateSubTasks(ctx, subtaskCfg1)), IsTrue)
c.Assert(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/scheduler/operateCheckSubtasksCanUpdate"), IsNil)

// update success
subtaskCfg1.Batch = 1000
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/scheduler/operateCheckSubtasksCanUpdate", `return("success")`), IsNil)
c.Assert(s.UpdateSubTasks(ctx, subtaskCfg1), IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/scheduler/operateCheckSubtasksCanUpdate"), IsNil)
c.Assert(s.getSubTaskCfgByTaskSource(taskName1, sourceID1).Batch, Equals, subtaskCfg1.Batch)

sourceCfg1.MetaDir = "new meta"
c.Assert(s.UpdateSourceCfg(sourceCfg1), IsNil)
c.Assert(s.GetSourceCfgByID(sourceID1).MetaDir, Equals, sourceCfg1.MetaDir)
}

func (t *testScheduler) TestScheduler(c *C) {
t.testSchedulerProgress(c, noRestart)
t.testSchedulerProgress(c, restartOnly)
Expand Down Expand Up @@ -179,11 +276,6 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
fake.SourceID = "not a source id"
c.Assert(terror.ErrSchedulerSourceCfgNotExist.Equal(s.UpdateSourceCfg(fake)), IsTrue)

// update field not related to relay-log will failed
fake2 := newCfg.Clone()
fake2.AutoFixGTID = !fake2.AutoFixGTID
c.Assert(terror.ErrSchedulerSourceCfgUpdate.Equal(s.UpdateSourceCfg(fake2)), IsTrue)

// one unbound source exist (because no free worker).
t.sourceBounds(c, s, []string{}, []string{sourceID1})
rebuildScheduler(ctx)
Expand Down Expand Up @@ -266,9 +358,6 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) {
t.downstreamMetaExist(c, s, taskName1, subtaskCfg1.To, subtaskCfg1.MetaSchema)
t.downstreamMetaNotExist(c, s, taskName2)

// update source config when task already started will failed
c.Assert(terror.ErrSchedulerSourceOpTaskExist.Equal(s.UpdateSourceCfg(sourceCfg1)), IsTrue)

// try start a task with two sources, some sources not bound.
c.Assert(terror.ErrSchedulerSourcesUnbound.Equal(s.AddSubTasks(false, pb.Stage_Running, subtaskCfg21, subtaskCfg22)), IsTrue)
t.subTaskCfgNotExist(c, s, taskName2, sourceID1)
Expand Down
30 changes: 30 additions & 0 deletions dm/dm/master/scheduler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,36 @@ func (w *Worker) queryStatus(ctx context.Context) (*workerrpc.Response, error) {
return w.SendRequest(ctx, req, rpcTimeOut)
}

func (w *Worker) checkSubtasksCanUpdate(ctx context.Context, cfg *config.SubTaskConfig) (*workerrpc.Response, error) {
rpcTimeOut := time.Second * 10 // we relay on ctx.Done() to cancel the rpc, so just set a very long timeout
tomlStr, err := cfg.Toml()
if err != nil {
return nil, err
}
req := &workerrpc.Request{
Type: workerrpc.CmdCheckSubtasksCanUpdate,
CheckSubtasksCanUpdate: &pb.CheckSubtasksCanUpdateRequest{SubtaskCfgTomlString: tomlStr},
}
failpoint.Inject("operateCheckSubtasksCanUpdate", func(v failpoint.Value) {
resp := &workerrpc.Response{
Type: workerrpc.CmdCheckSubtasksCanUpdate, CheckSubtasksCanUpdate: &pb.CheckSubtasksCanUpdateResponse{},
}
switch v.(string) {
case "success":
resp.CheckSubtasksCanUpdate.Success = true
failpoint.Return(resp, nil)
case "failed":
resp.CheckSubtasksCanUpdate.Success = false
resp.CheckSubtasksCanUpdate.Msg = "error happened"
failpoint.Return(resp, nil)
default:
failpoint.Return(nil, errors.New("query error"))
}
})

return w.SendRequest(ctx, req, rpcTimeOut)
}

// NewMockWorker is used in tests.
func NewMockWorker(cli workerrpc.Client) *Worker {
return &Worker{cli: cli}
Expand Down
Loading