Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

master/worker(dm): suppport update config when task not run #4760

Merged
merged 29 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
dbe67dc
save work
Ehco1996 Mar 3, 2022
587efbb
address comment
Ehco1996 Mar 3, 2022
ff8bcc6
add more ut
Ehco1996 Mar 3, 2022
0be36f6
check relay worekrs
Ehco1996 Mar 4, 2022
f13457b
worker side check
Ehco1996 Mar 4, 2022
7cd1276
revcert about source enable
Ehco1996 Mar 4, 2022
a24ba25
just log error when update failed
Ehco1996 Mar 4, 2022
7c446a7
fix an test
Ehco1996 Mar 4, 2022
88c8b65
fix an panic when unit is nil
Ehco1996 Mar 4, 2022
6b90fd1
fix ut with mock unit
Ehco1996 Mar 4, 2022
c18db33
fix nil check
Ehco1996 Mar 4, 2022
64d59b1
fix comment
Ehco1996 Mar 7, 2022
7379798
address comment
Ehco1996 Mar 8, 2022
7864eee
fix a test
Ehco1996 Mar 8, 2022
e1a2d3e
fix CheckCanUpdateCfg
Ehco1996 Mar 8, 2022
7aa818a
address comment
Ehco1996 Mar 9, 2022
7749eac
revert GetRelayWorkers
Ehco1996 Mar 9, 2022
dd8c699
Merge branch 'master' into feature-#4484-update-cfg
Ehco1996 Mar 9, 2022
1e0082c
address comment
Ehco1996 Mar 9, 2022
6f1d1b2
fix ut
Ehco1996 Mar 9, 2022
385d49b
Merge branch 'master' into feature-#4484-update-cfg
Ehco1996 Mar 10, 2022
f80b6b9
Apply suggestions from code review
Ehco1996 Mar 10, 2022
3965843
address comment
Ehco1996 Mar 10, 2022
e2aa469
copy code from another branch
Ehco1996 Mar 10, 2022
2aeb520
revert test func move
Ehco1996 Mar 10, 2022
6074849
Merge branch 'master' into feature-#4484-update-cfg
Ehco1996 Mar 10, 2022
889cd62
Merge branch 'master' into feature-#4484-update-cfg
ti-chi-bot Mar 10, 2022
522800a
Merge branch 'master' into feature-#4484-update-cfg
ti-chi-bot Mar 10, 2022
26c5a55
Merge branch 'master' into feature-#4484-update-cfg
ti-chi-bot Mar 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,12 @@ ErrSchedulerRelayStageInvalidUpdate,[code=46011:class=scheduler:scope=internal:l
ErrSchedulerRelayStageSourceNotExist,[code=46012:class=scheduler:scope=internal:level=medium], "Message: sources %v need to update expectant relay stage not exist"
ErrSchedulerMultiTask,[code=46013:class=scheduler:scope=internal:level=medium], "Message: the scheduler cannot perform multiple different tasks %v in one operation"
ErrSchedulerSubTaskExist,[code=46014:class=scheduler:scope=internal:level=medium], "Message: subtasks with name %s for sources %v already exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerSubTaskNotExist,[code=46034:class=scheduler:scope=internal:level=medium], "Message: subtasks with name %s for sources %v not exist, Workaround: Please create this subtask first."
ErrSchedulerSubTaskStageInvalidUpdate,[code=46015:class=dm-master:scope=internal:level=medium], "Message: invalid new expectant subtask stage %s"
ErrSchedulerSubTaskOpTaskNotExist,[code=46016:class=dm-master:scope=internal:level=medium], "Message: subtasks with name %s need to be operate not exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerSubTaskOpSourceNotExist,[code=46017:class=dm-master:scope=internal:level=medium], "Message: sources %v need to be operate not exist"
ErrSchedulerTaskNotExist,[code=46018:class=scheduler:scope=internal:level=medium], "Message: task with name %s not exist, Workaround: Please use `query-status` command to see tasks."
ErrSchedulerSubTaskCfgUpdate,[code=46035:class=scheduler:scope=internal:level=low], "Message: subtask with name %s source name %s can only update when no running tasks for now"
ErrSchedulerRequireRunningTaskInSyncUnit,[code=46019:class=scheduler:scope=internal:level=high], "Message: running tasks %v to be transferred on source %s should in sync unit, Workaround: Please use `pause-task [-s source ...] task` to pause them first."
ErrSchedulerRelayWorkersBusy,[code=46020:class=scheduler:scope=internal:level=high], "Message: these workers %s have started relay for sources %s respectively, Workaround: Please use `stop-relay` to stop them, or change your topology."
ErrSchedulerRelayWorkersWrongBound,[code=46021:class=scheduler:scope=internal:level=high], "Message: these workers %s have bound for another sources %s respectively, Workaround: Please `start-relay` on free or same source workers."
Expand Down
4 changes: 4 additions & 0 deletions dm/dm/config/source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ enable-relay: false
# relay-binlog-gtid: ''
# relay-dir: ./relay_log

# whether this source is enabled (if a source not in enabled status it can not serve subtasks)
# the source cfg can only be updated if enable = false.
enable: true

#enable gtid in relay log unit
enable-gtid: false

Expand Down
4 changes: 4 additions & 0 deletions dm/dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type PurgeConfig struct {

// SourceConfig is the configuration for source.
type SourceConfig struct {
Enable bool `yaml:"enable" toml:"enable" json:"enable"`
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
EnableGTID bool `yaml:"enable-gtid" toml:"enable-gtid" json:"enable-gtid"`
AutoFixGTID bool `yaml:"auto-fix-gtid" toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `yaml:"relay-dir" toml:"relay-dir" json:"relay-dir"`
Expand Down Expand Up @@ -105,6 +106,7 @@ func NewSourceConfig() *SourceConfig {
// NewSourceConfig creates a new base config without adjust.
func newSourceConfig() *SourceConfig {
c := &SourceConfig{
Enable: true,
Purge: PurgeConfig{
Interval: 60 * 60,
Expires: 0,
Expand Down Expand Up @@ -397,6 +399,7 @@ func (c *SourceConfig) YamlForDowngrade() (string, error) {
// This config is used for downgrade(config export) from a higher dmctl version.
// When we add any new config item into SourceConfig, we should update it also.
type SourceConfigForDowngrade struct {
Enable bool `yaml:"enable"`
EnableGTID bool `yaml:"enable-gtid"`
AutoFixGTID bool `yaml:"auto-fix-gtid"`
RelayDir string `yaml:"relay-dir"`
Expand All @@ -421,6 +424,7 @@ type SourceConfigForDowngrade struct {
// NewSourceConfigForDowngrade creates a new base config for downgrade.
func NewSourceConfigForDowngrade(sourceCfg *SourceConfig) *SourceConfigForDowngrade {
return &SourceConfigForDowngrade{
Enable: sourceCfg.Enable,
EnableGTID: sourceCfg.EnableGTID,
AutoFixGTID: sourceCfg.AutoFixGTID,
RelayDir: sourceCfg.RelayDir,
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/master/openapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *Server) updateSource(ctx context.Context, cfg *config.SourceConfig) err
// nolint:unparam
func (s *Server) deleteSource(ctx context.Context, sourceName string, force bool) error {
if force {
for _, taskName := range s.scheduler.GetTaskNameListBySourceName(sourceName) {
for _, taskName := range s.scheduler.GetTaskNameListBySourceName(sourceName, nil) {
if err := s.scheduler.RemoveSubTasks(taskName, sourceName); err != nil {
return err
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func (s *Server) enableSource(ctx context.Context, sourceName, workerName string
if worker == nil {
return terror.ErrWorkerNoStart
}
taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName)
taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName, nil)
return s.scheduler.BatchOperateTaskOnWorker(ctx, worker, taskNameList, sourceName, pb.Stage_Running, true)
}

Expand All @@ -119,7 +119,7 @@ func (s *Server) disableSource(ctx context.Context, sourceName string) error {
// no need to stop task if the source is not running
return nil
}
taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName)
taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName, nil)
return s.scheduler.BatchOperateTaskOnWorker(ctx, worker, taskNameList, sourceName, pb.Stage_Stopped, true)
}

Expand Down
96 changes: 74 additions & 22 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (s *Scheduler) addSource(cfg *config.SourceConfig) error {
}

// UpdateSourceCfg update the upstream source config to the cluster.
// please verify the config before call this.
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
// now only support update a source which not have running subtasks.
func (s *Scheduler) UpdateSourceCfg(cfg *config.SourceConfig) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -409,32 +409,20 @@ func (s *Scheduler) UpdateSourceCfg(cfg *config.SourceConfig) error {
return terror.ErrSchedulerSourceCfgNotExist.Generate(cfg.SourceID)
}
// 2. check if tasks using this configuration are running
if tasks := s.GetTaskNameListBySourceName(cfg.SourceID); len(tasks) > 0 {
return terror.ErrSchedulerSourceOpTaskExist.Generate(cfg.SourceID, tasks)
runningStage := pb.Stage_Running
if tasks := s.GetTaskNameListBySourceName(cfg.SourceID, &runningStage); len(tasks) > 0 {
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
return terror.ErrSchedulerSourceCfgUpdate.Generate(cfg.SourceID)
}
// 3. check this cfg is ok to update.
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
if !checkSourceCfgCanUpdated(s.sourceCfgs[cfg.SourceID], cfg) {
return terror.ErrSchedulerSourceCfgUpdate.Generate()
}
// 4. put the config into etcd.
// 3. put the config into etcd.
_, err := ha.PutSourceCfg(s.etcdCli, cfg)
if err != nil {
return err
}
// 5. record the config in the scheduler.
// 4. record the config in the scheduler.
s.sourceCfgs[cfg.SourceID] = cfg
return nil
}

// currently the source cfg can only update relay-log related parts.
func checkSourceCfgCanUpdated(oldCfg, newCfg *config.SourceConfig) bool {
newCfgClone := newCfg.Clone()
newCfgClone.RelayBinLogName = oldCfg.RelayBinLogName
newCfgClone.RelayBinlogGTID = oldCfg.RelayBinlogGTID
newCfgClone.RelayDir = oldCfg.RelayDir
return newCfgClone.String() == oldCfg.String()
}

// RemoveSourceCfg removes the upstream source config in the cluster.
// when removing the upstream source config, it should also remove:
// - any existing relay stage.
Expand Down Expand Up @@ -1039,6 +1027,64 @@ func (s *Scheduler) RemoveSubTasks(task string, sources ...string) error {
return nil
}

// UpdateSubTasks update the information of one or more subtasks for one task.
// now only support update a task that not in running stage.
func (s *Scheduler) UpdateSubTasks(cfgs ...config.SubTaskConfig) error {
Copy link
Contributor Author

@Ehco1996 Ehco1996 Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need check if this field is safe to updated, also need to check current unit with this subtask check this in worker or master? if checked in worekr i need to added new rpc

Copy link
Contributor Author

@Ehco1996 Ehco1996 Mar 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actully we can check this just on master side
no we need send request to worker to check the current task unit anyway.... so we better just do this check on worker side

s.mu.Lock()
defer s.mu.Unlock()

if !s.started.Load() {
return terror.ErrSchedulerNotStarted.Generate()
}

if len(cfgs) == 0 {
return nil // no subtasks need to add, this should not happen.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can change the function parameter to (context, SubTaskConfig, ...SubTaskConfig) to tell the compiler there must be at least one SubTaskConfig.

OK to not change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can't find the example of how to use unnamed paramaters , how can i refer the unnamed stcfgs?

seems the only use of unnamed parameters is when you have to define a function with a specific
https://stackoverflow.com/questions/24000305/how-to-refer-to-an-unnamed-function-argument-in-go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(context, ...SubTaskConfig) -> (context, SubTaskConfig, ...SubTaskConfig)

}

taskNamesM := make(map[string]struct{}, 1)

for _, cfg := range cfgs {
taskNamesM[cfg.Name] = struct{}{}
}
taskNames := strMapToSlice(taskNamesM)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
if len(taskNames) > 1 {
// only subtasks from one task supported now.
return terror.ErrSchedulerMultiTask.Generate(taskNames)
}

// check whether exists.
for _, cfg := range cfgs {
v, ok := s.subTaskCfgs.Load(cfg.Name)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
return terror.ErrSchedulerTaskNotExist.Generate(cfg.Name)
}
cfgM := v.(map[string]config.SubTaskConfig)
_, ok = cfgM[cfg.SourceID]
if !ok {
return terror.ErrSchedulerSubTaskNotExist.Generate(cfg.Name, cfg.SourceID)
}
}
// check whether in running stage
for _, cfg := range cfgs {
stage := s.GetExpectSubTaskStage(cfg.Name, cfg.SourceID)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
if stage.Expect == pb.Stage_Running {
return terror.ErrSchedulerSubTaskCfgUpdate.Generate(cfg.Name, cfg.SourceID)
}
}
// put the configs and stages into etcd.
_, err := ha.PutSubTaskCfgStage(s.etcdCli, cfgs, []ha.Stage{}, []ha.Stage{})
if err != nil {
return err
}
// 5. record the config and the expectant stage.
for _, cfg := range cfgs {
v, _ := s.subTaskCfgs.LoadOrStore(cfg.Name, map[string]config.SubTaskConfig{})
m := v.(map[string]config.SubTaskConfig)
m[cfg.SourceID] = cfg
}
return nil
}

// getSubTaskCfgByTaskSource gets subtask config by task name and source ID. Only used in tests.
func (s *Scheduler) getSubTaskCfgByTaskSource(task, source string) *config.SubTaskConfig {
v, ok := s.subTaskCfgs.Load(task)
Expand Down Expand Up @@ -1144,12 +1190,18 @@ func (s *Scheduler) GetSubTaskCfgs() map[string]map[string]config.SubTaskConfig
}

// GetTaskNameListBySourceName gets task name list by source name.
func (s *Scheduler) GetTaskNameListBySourceName(sourceName string) []string {
func (s *Scheduler) GetTaskNameListBySourceName(sourceName string, expectStage *pb.Stage) []string {
var taskNameList []string
s.subTaskCfgs.Range(func(k, v interface{}) bool {
s.expectSubTaskStages.Range(func(k, v interface{}) bool {
subtaskM := v.(map[string]ha.Stage)
subtaskStage, ok2 := subtaskM[sourceName]
if !ok2 {
return true
}
task := k.(string)
subtaskCfgMap := v.(map[string]config.SubTaskConfig)
if _, ok := subtaskCfgMap[sourceName]; ok {
if expectStage == nil {
taskNameList = append(taskNameList, task)
} else if subtaskStage.Expect == *expectStage {
taskNameList = append(taskNameList, task)
}
return true
Expand Down
68 changes: 68 additions & 0 deletions dm/dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,74 @@ var _ = Suite(&testScheduler{})

var stageEmpty ha.Stage

func (t *testScheduler) TestUpdateSubTasks(c *C) {
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
defer clearTestInfoOperation(c)

var (
logger = log.L()
s = NewScheduler(&logger, config.Security{})
sourceID1 = "mysql-replica-1"
taskName1 = "task-1"
workerName1 = "dm-worker-1"
workerAddr1 = "127.0.0.1:8262"
subtaskCfg1 config.SubTaskConfig
keepAliveTTL = int64(5)
)
sourceCfg1, err := config.LoadFromFile(sourceSampleFile)
c.Assert(err, IsNil)
sourceCfg1.SourceID = sourceID1
c.Assert(subtaskCfg1.DecodeFile(subTaskSampleFile, true), IsNil)
subtaskCfg1.SourceID = sourceID1
subtaskCfg1.Name = taskName1
c.Assert(subtaskCfg1.Adjust(true), IsNil)

// not started scheduler can't update
c.Assert(terror.ErrSchedulerNotStarted.Equal(s.UpdateSubTasks(subtaskCfg1)), IsTrue)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c.Assert(s.Start(ctx, etcdTestCli), IsNil)

subtaskCfg2 := subtaskCfg1
subtaskCfg2.Name = "fake name"
// can't update subtask with different task name
c.Assert(terror.ErrSchedulerMultiTask.Equal(s.UpdateSubTasks(subtaskCfg1, subtaskCfg2)), IsTrue)

// can't update not added subtask
c.Assert(terror.ErrSchedulerTaskNotExist.Equal(s.UpdateSubTasks(subtaskCfg1)), IsTrue)

// start worker,add source and subtask
c.Assert(s.AddSourceCfg(sourceCfg1), IsNil)
ctx1, cancel1 := context.WithCancel(ctx)
defer cancel1()
c.Assert(s.AddWorker(workerName1, workerAddr1), IsNil)
go func() {
c.Assert(ha.KeepAlive(ctx1, etcdTestCli, workerName1, keepAliveTTL), IsNil)
}()
// wait for source1 bound to worker1.
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
bounds := s.BoundSources()
return len(bounds) == 1 && bounds[0] == sourceID1
}), IsTrue)
c.Assert(s.AddSubTasks(false, pb.Stage_Running, subtaskCfg1), IsNil)

// can't update subtask not in scheduler
subtaskCfg2.Name = subtaskCfg1.Name
subtaskCfg2.SourceID = "fake source name"
c.Assert(terror.ErrSchedulerSubTaskNotExist.Equal(s.UpdateSubTasks(subtaskCfg2)), IsTrue)

// can't update subtask in running stage
c.Assert(terror.ErrSchedulerSubTaskCfgUpdate.Equal(s.UpdateSubTasks(subtaskCfg1)), IsTrue)

// pause task
c.Assert(s.UpdateExpectSubTaskStage(pb.Stage_Paused, taskName1, sourceID1), IsNil)
subtaskCfg1.Batch = 1000

// update success
c.Assert(s.UpdateSubTasks(subtaskCfg1), IsNil)
c.Assert(s.getSubTaskCfgByTaskSource(taskName1, sourceID1).Batch, Equals, subtaskCfg1.Batch)
}

func (t *testScheduler) TestScheduler(c *C) {
t.testSchedulerProgress(c, noRestart)
t.testSchedulerProgress(c, restartOnly)
Expand Down
60 changes: 47 additions & 13 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,19 +305,13 @@ func (w *SourceWorker) EnableRelay(startBySourceCfg bool) (err error) {

w.startedRelayBySourceCfg = startBySourceCfg

var sourceCfg *config.SourceConfig
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
failpoint.Inject("MockGetSourceCfgFromETCD", func(_ failpoint.Value) {
failpoint.Goto("bypass")
})

if !w.startedRelayBySourceCfg {
// we need update worker source config from etcd first
// because the configuration of the relay part of the data source may be changed via scheduler.UpdateSourceCfg
sourceCfg, _, err = ha.GetRelayConfig(w.etcdClient, w.name)
if err != nil {
return err
}
w.cfg = sourceCfg
// we need update worker source config from etcd first
// because the configuration of the relay part of the data source may be changed via scheduler.UpdateSourceCfg
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
if refreshErr := w.refreshSourceCfg(); refreshErr != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we only need to refresh when user want to enable-relay or restart task for source

return refreshErr
}

failpoint.Label("bypass")
Expand Down Expand Up @@ -593,9 +587,11 @@ func (w *SourceWorker) getRelayWithoutLock() relay.Process {
}

// UpdateSubTask update config for a sub task.
func (w *SourceWorker) UpdateSubTask(ctx context.Context, cfg *config.SubTaskConfig) error {
w.Lock()
defer w.Unlock()
func (w *SourceWorker) UpdateSubTask(ctx context.Context, cfg *config.SubTaskConfig, needLock bool) error {
if needLock {
w.Lock()
defer w.Unlock()
}

if w.closed.Load() {
return terror.ErrWorkerAlreadyClosed.Generate()
Expand Down Expand Up @@ -625,6 +621,21 @@ func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error {
}

w.l.Info("OperateSubTask start", zap.Stringer("op", op), zap.String("task", name))
failpoint.Inject("SkipRefreshFromETCDInUT", func(_ failpoint.Value) {
failpoint.Goto("bypassRefresh")
})
if op == pb.TaskOp_Resume || op == pb.TaskOp_AutoResume {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when user want to restart or start a stopped task, use the newest cfg from etcd.

dynamic update will impl later

// we need update worker source config from etcd first
// because the configuration of source may be changed via scheduler.UpdateSourceCfg
if refreshErr := w.refreshSourceCfg(); refreshErr != nil {
return refreshErr
}
// we need update subtask and source config in case of this config is updated in master
if refreshErr := w.refreshSubTaskConfig(w.cfg.SourceID, name); refreshErr != nil {
return refreshErr
}
}
failpoint.Label("bypassRefresh")
var err error
switch op {
case pb.TaskOp_Delete:
Expand Down Expand Up @@ -1277,3 +1288,26 @@ func (w *SourceWorker) operateValidatorStage(stage ha.Stage) error {
}
return nil
}

func (w *SourceWorker) refreshSourceCfg() error {
sourceCfg, _, err := ha.GetRelayConfig(w.etcdClient, w.name)
if err != nil {
return err
}
w.cfg = sourceCfg
return nil
}

func (w *SourceWorker) refreshSubTaskConfig(sourceName, taskName string) error {
// TODO check -1 rev is the newest version
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
tsm, _, err := ha.GetSubTaskCfg(w.etcdClient, sourceName, taskName, int64(-1))
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return terror.Annotate(err, "fail to get subtask config from etcd")
}
var subTaskCfg config.SubTaskConfig
var ok bool
if subTaskCfg, ok = tsm[taskName]; !ok {
return terror.ErrWorkerFailToGetSubtaskConfigFromEtcd.Generate(taskName)
}
return w.UpdateSubTask(w.ctx, &subTaskCfg, false)
}
Loading