Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Mar 9, 2022
1 parent e1a2d3e commit 7aa818a
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 279 deletions.
2 changes: 1 addition & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ ErrSchedulerRelayWorkersWrongBound,[code=46021:class=scheduler:scope=internal:le
ErrSchedulerRelayWorkersWrongRelay,[code=46022:class=scheduler:scope=internal:level=high], "Message: these workers %s have started relay for another sources %s respectively, Workaround: Please correct sources in `stop-relay`."
ErrSchedulerSourceOpRelayExist,[code=46023:class=scheduler:scope=internal:level=high], "Message: source with name %s need to operate has existing relay workers %s, Workaround: Please `stop-relay` first."
ErrSchedulerLatchInUse,[code=46024:class=scheduler:scope=internal:level=low], "Message: when %s, resource %s is in use by other client, Workaround: Please try again later"
ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update when no relay workers and no running tasks for now"
ErrSchedulerSourceCfgUpdate,[code=46025:class=scheduler:scope=internal:level=low], "Message: source can only update when not enable relay and no running tasks for now"
ErrSchedulerWrongWorkerInput,[code=46026:class=scheduler:scope=internal:level=medium], "Message: require DM master to modify worker [%s] with source [%s], but currently the worker is bound to source [%s]"
ErrSchedulerBoundDiffWithStartedRelay,[code=46027:class=scheduler:scope=internal:level=medium], "Message: require DM worker [%s] to be bound to source [%s], but it has been started relay for source [%s], Workaround: If you intend to bind the source with worker, you can stop-relay for current source."
ErrSchedulerStartRelayOnSpecified,[code=46028:class=scheduler:scope=internal:level=low], "Message: the source has `start-relay` with worker name for workers %v, so it can't `start-relay` without worker name now, Workaround: Please stop all relay workers first, or specify worker name for `start-relay`."
Expand Down
28 changes: 10 additions & 18 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ type Scheduler struct {
// a mirror of bounds whose element is not deleted when worker unbound. worker -> SourceBound
lastBound map[string]ha.SourceBound

// TODO: seems this memory status is useless.
// expectant relay stages for sources, source ID -> stage.
// add:
// - bound the source to a worker (at first time).
Expand Down Expand Up @@ -412,16 +411,12 @@ func (s *Scheduler) UpdateSourceCfg(cfg *config.SourceConfig) error {
if tasks := s.GetTaskNameListBySourceName(cfg.SourceID, &runningStage); len(tasks) > 0 {
return terror.ErrSchedulerSourceCfgUpdate.Generate(cfg.SourceID)
}
// 3. check if there is relay workers for this source
relayWorkers, err := s.getRelayWorkers(cfg.SourceID)
if err != nil {
return err
}
if len(relayWorkers) > 0 {
// 3. check if this source is enable relay
if _, ok := s.expectRelayStages[cfg.SourceID]; ok {
return terror.ErrSchedulerSourceCfgUpdate.Generate(cfg.SourceID)
}
// 4. put the config into etcd.
_, err = ha.PutSourceCfg(s.etcdCli, cfg)
_, err := ha.PutSourceCfg(s.etcdCli, cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -1048,10 +1043,9 @@ func (s *Scheduler) UpdateSubTasks(ctx context.Context, cfgs ...config.SubTaskCo
for _, cfg := range cfgs {
taskNamesM[cfg.Name] = struct{}{}
}
taskNames := strMapToSlice(taskNamesM)
if len(taskNames) > 1 {
if len(taskNamesM) > 1 {
// only subtasks from one task supported now.
return terror.ErrSchedulerMultiTask.Generate(taskNames)
return terror.ErrSchedulerMultiTask.Generate(strMapToSlice(taskNamesM))
}
// check whether exists.
for _, cfg := range cfgs {
Expand All @@ -1066,18 +1060,17 @@ func (s *Scheduler) UpdateSubTasks(ctx context.Context, cfgs ...config.SubTaskCo
}
}
// check whether in running stage
for _, cfg := range cfgs {
stage := s.GetExpectSubTaskStage(cfg.Name, cfg.SourceID)
if stage.Expect == pb.Stage_Running {
return terror.ErrSchedulerSubTaskCfgUpdate.Generate(cfg.Name, cfg.SourceID)
}
cfg := cfgs[0]
stage := s.GetExpectSubTaskStage(cfg.Name, cfg.SourceID)
if stage.Expect == pb.Stage_Running {
return terror.ErrSchedulerSubTaskCfgUpdate.Generate(cfg.Name, cfg.SourceID)
}

// check by workers todo batch
for _, cfg := range cfgs {
worker := s.bounds[cfg.SourceID]
if worker == nil {
return terror.ErrWorkerNoStart
return terror.ErrSchedulerSubTaskCfgUpdate.Generatef("this source: %s have not bound to worker", cfg.SourceID)
}
resp, err := worker.checkSubtasksCanUpdate(ctx, &cfg)
if err != nil {
Expand Down Expand Up @@ -1741,7 +1734,6 @@ func (s *Scheduler) UpdateExpectSubTaskStage(newStage pb.Stage, taskName string,

// GetExpectSubTaskStage returns the current expect subtask stage.
// If the stage not exists, an invalid stage is returned.
// This func is used for testing.
func (s *Scheduler) GetExpectSubTaskStage(task, source string) ha.Stage {
invalidStage := ha.NewSubTaskStage(pb.Stage_InvalidStage, source, task)

Expand Down
59 changes: 26 additions & 33 deletions dm/dm/pb/dmworker.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7aa818a

Please sign in to comment.