Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
error when there's running task on that source
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Mar 12, 2021
1 parent e8f3da9 commit 30f87fb
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 7 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ ErrSchedulerSubTaskStageInvalidUpdate,[code=46015:class=dm-master:scope=internal
ErrSchedulerSubTaskOpTaskNotExist,[code=46016:class=dm-master:scope=internal:level=medium], "Message: subtasks with name %s need to be operate not exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerSubTaskOpSourceNotExist,[code=46017:class=dm-master:scope=internal:level=medium], "Message: sources %v need to be operate not exist"
ErrSchedulerTaskNotExist,[code=46018:class=scheduler:scope=internal:level=medium], "Message: task with name %s not exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerRequireNotRunning,[code=46019:class=scheduler:scope=internal:level=high], "Message: tasks %v on source %s should not be running, Workaround: Please use `pause-task [-s source ...] task` to pause them first"
ErrCtlGRPCCreateConn,[code=48001:class=dmctl:scope=internal:level=high], "Message: can not create grpc connection, Workaround: Please check your network connection."
ErrCtlInvalidTLSCfg,[code=48002:class=dmctl:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in command line."
ErrNotSet,[code=50000:class=not-set:scope=not-set:level=high]
19 changes: 17 additions & 2 deletions dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,22 @@ func (s *Scheduler) TransferSource(source, worker string) error {
return err
}

// 4. replace the source bound
// 4. if there's old worker, make sure it's not running
var runningTasks []string
for task, subtaskM := range s.expectSubTaskStages {
subtaskStage, ok2 := subtaskM[source]
if !ok2 {
continue
}
if subtaskStage.Expect == pb.Stage_Running {
runningTasks = append(runningTasks, task)
}
}
if len(runningTasks) > 0 {
return terror.ErrSchedulerRequireNotRunning.Generate(runningTasks, source)
}

// 5. replace the source bound
failpoint.Inject("failToReplaceSourceBound", func(_ failpoint.Value) {
failpoint.Return(errors.New("failToPutSourceBound"))
})
Expand All @@ -405,7 +420,7 @@ func (s *Scheduler) TransferSource(source, worker string) error {
// we have checked w.stage is free, so there should not be an error
_ = s.updateStatusForBound(w, ha.NewSourceBound(source, worker))

// 5. try bound the old worker
// 6. try bound the old worker
_, err = s.tryBoundForWorker(oldWorker)
if err != nil {
s.logger.Warn("in transfer source, error when try bound the old worker", zap.Error(err))
Expand Down
9 changes: 7 additions & 2 deletions dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,8 +1103,13 @@ func (t *testScheduler) TestTransferSource(c *C) {

// test fail halfway won't left old worker unbound
c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/master/scheduler/failToReplaceSourceBound", `return()`), IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/dm/dm/master/scheduler/failToReplaceSourceBound")
c.Assert(s.TransferSource(sourceID1, workerName1), NotNil)
c.Assert(s.bounds[sourceID1], DeepEquals, worker4)
c.Assert(worker1.Stage(), Equals, WorkerFree)
c.Assert(failpoint.Disable("github.com/pingcap/dm/dm/master/scheduler/failToReplaceSourceBound"), IsNil)

// test can't transfer when there's any running task on the source
s.expectSubTaskStages["test"] = map[string]ha.Stage{sourceID1: {Expect: pb.Stage_Running}}
c.Assert(s.TransferSource(sourceID1, workerName1), NotNil)
c.Assert(s.bounds[sourceID1], DeepEquals, worker4)
c.Assert(worker1.Stage(), Equals, WorkerFree)
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2950,6 +2950,12 @@ description = ""
workaround = "Please use `query-status` command to see tasks."
tags = ["internal", "medium"]

[error.DM-scheduler-46019]
message = "tasks %v on source %s should not be running"
description = ""
workaround = "Please use `pause-task [-s source ...] task` to pause them first"
tags = ["internal", "high"]

[error.DM-dmctl-48001]
message = "can not create grpc connection"
description = ""
Expand Down
2 changes: 2 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ const (
codeSchedulerSubTaskOpTaskNotExist
codeSchedulerSubTaskOpSourceNotExist
codeSchedulerTaskNotExist
codeSchedulerRequireNotRunning
)

// dmctl error code
Expand Down Expand Up @@ -1192,6 +1193,7 @@ var (
ErrSchedulerSubTaskOpTaskNotExist = New(codeSchedulerSubTaskOpTaskNotExist, ClassDMMaster, ScopeInternal, LevelMedium, "subtasks with name %s need to be operate not exist", "Please use `query-status` command to see tasks.")
ErrSchedulerSubTaskOpSourceNotExist = New(codeSchedulerSubTaskOpSourceNotExist, ClassDMMaster, ScopeInternal, LevelMedium, "sources %v need to be operate not exist", "")
ErrSchedulerTaskNotExist = New(codeSchedulerTaskNotExist, ClassScheduler, ScopeInternal, LevelMedium, "task with name %s not exist", "Please use `query-status` command to see tasks.")
ErrSchedulerRequireNotRunning = New(codeSchedulerRequireNotRunning, ClassScheduler, ScopeInternal, LevelHigh, "tasks %v on source %s should not be running", "Please use `pause-task [-s source ...] task` to pause them first")

// dmctl
ErrCtlGRPCCreateConn = New(codeCtlGRPCCreateConn, ClassDMCtl, ScopeInternal, LevelHigh, "can not create grpc connection", "Please check your network connection.")
Expand Down
6 changes: 3 additions & 3 deletions tests/dmctl_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ function run() {
'"worker": "worker1"' 1 \
'"worker": "worker2"' 1

transfer_source_valid $SOURCE_ID1 worker1 # transfer to self
transfer_source_invalid $SOURCE_ID1 worker2

echo "pause_relay_success"
pause_relay_success
query_status_stopped_relay
Expand Down Expand Up @@ -184,9 +187,6 @@ function run() {
"\"stage\": \"Running\"" 4
# update_task_not_paused $TASK_CONF

transfer_source_valid $SOURCE_ID1 worker1 # transfer to self
transfer_source_invalid $SOURCE_ID1 worker2

echo "get_config"
get_config_wrong_arg
get_config_to_file
Expand Down
11 changes: 11 additions & 0 deletions tests/ha/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,21 @@ function run() {

# manually transfer a exist source to a newly started worker
run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml

# pause task first
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"transfer-source $SOURCE_ID1 worker3" \
"tasks \[test\] on source $SOURCE_ID1 should not be running" 1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task -s $SOURCE_ID1 test" \
"\"result\": true" 2
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"transfer-source $SOURCE_ID1 worker3" \
"\"result\": true" 1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task -s $SOURCE_ID1 test" \
"\"result\": true" 2

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member --name worker3" \
Expand Down

0 comments on commit 30f87fb

Please sign in to comment.