Skip to content

Commit

Permalink
DM/Openapi: support start task by some conditions (#5349)
Browse files Browse the repository at this point in the history
close #5348
  • Loading branch information
WizardXiao authored May 16, 2022
1 parent 168062e commit da4924f
Show file tree
Hide file tree
Showing 15 changed files with 665 additions and 129 deletions.
2 changes: 1 addition & 1 deletion dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ ErrConfigReadCfgFromFile,[code=20018:class=config:scope=internal:level=medium],
ErrConfigNeedUniqueTaskName,[code=20019:class=config:scope=internal:level=medium], "Message: must specify a unique task name, Workaround: Please check the `name` config in task configuration file."
ErrConfigInvalidTaskMode,[code=20020:class=config:scope=internal:level=medium], "Message: please specify right task-mode, support `full`, `incremental`, `all`, Workaround: Please check the `task-mode` config in task configuration file."
ErrConfigNeedTargetDB,[code=20021:class=config:scope=internal:level=medium], "Message: must specify target-database, Workaround: Please check the `target-database` config in task configuration file."
ErrConfigMetadataNotSet,[code=20022:class=config:scope=internal:level=medium], "Message: mysql-instance(%d) must set meta for task-mode %s, Workaround: Please check the `meta` config in task configuration file."
ErrConfigMetadataNotSet,[code=20022:class=config:scope=internal:level=medium], "Message: mysql-instance(%s) must set meta for task-mode %s, Workaround: Please check the `meta` config in task configuration file."
ErrConfigRouteRuleNotFound,[code=20023:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s route-rules %s not exist in routes, Workaround: Please check the `route-rules` config in task configuration file."
ErrConfigFilterRuleNotFound,[code=20024:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s filter-rules %s not exist in filters, Workaround: Please check the `filter-rules` config in task configuration file."
ErrConfigColumnMappingNotFound,[code=20025:class=config:scope=internal:level=medium], "Message: mysql-instance(%d)'s column-mapping-rules %s not exist in column-mapping, Workaround: Please check the `column-mapping-rules` config in task configuration file."
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ func (c *TaskConfig) adjust() error {
}
case ModeIncrement:
if inst.Meta == nil {
return terror.ErrConfigMetadataNotSet.Generate(i, c.TaskMode)
return terror.ErrConfigMetadataNotSet.Generate(inst.SourceID, c.TaskMode)
}
err := inst.Meta.Verify()
if err != nil {
Expand Down
37 changes: 35 additions & 2 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,12 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
}
subTaskCfg.Meta = meta
}
// check must set meta when mode is ModeIncrement

// if there is no meta for incremental task, we print a warning log
if subTaskCfg.Meta == nil && subTaskCfg.Mode == ModeIncrement {
return nil, terror.ErrConfigMetadataNotSet.Generate(i, ModeIncrement)
log.L().Warn("mysql-instance doesn't set meta for incremental mode, user should specify start_time to start task.", zap.String("sourceID", sourceCfg.SourceName))
}

// set shard config
if task.ShardMode != nil {
subTaskCfg.IsSharding = true
Expand Down Expand Up @@ -691,3 +693,34 @@ func genFilterRuleName(sourceName string, idx int) string {
// NOTE that we don't have user input filter rule name in sub task config, so we make one by ourself
return fmt.Sprintf("%s-filter-rule-%d", sourceName, idx)
}

func OpenAPIStartTaskReqToTaskCliArgs(req openapi.StartTaskRequest) (*TaskCliArgs, error) {
if req.StartTime == nil && req.SafeModeTimeDuration == nil {
return nil, nil
}
cliArgs := &TaskCliArgs{}
if req.StartTime != nil {
cliArgs.StartTime = *req.StartTime
}
if req.SafeModeTimeDuration != nil {
cliArgs.SafeModeDuration = *req.SafeModeTimeDuration
}

if err := cliArgs.Verify(); err != nil {
return nil, err
}
return cliArgs, nil
}

func OpenAPIStopTaskReqToTaskCliArgs(req openapi.StopTaskRequest) (*TaskCliArgs, error) {
if req.TimeoutDuration == nil {
return nil, nil
}
cliArgs := &TaskCliArgs{
WaitTimeOnStop: *req.TimeoutDuration,
}
if err := cliArgs.Verify(); err != nil {
return nil, err
}
return cliArgs, nil
}
58 changes: 54 additions & 4 deletions dm/dm/master/openapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ import (
"fmt"
"strings"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/dm/master/scheduler"
"github.com/pingcap/tiflow/dm/pkg/ha"

"github.com/pingcap/tiflow/dm/checker"
dmcommon "github.com/pingcap/tiflow/dm/dm/common"
"github.com/pingcap/tiflow/dm/dm/config"
Expand Down Expand Up @@ -377,7 +382,7 @@ func (s *Server) checkOpenAPITaskBeforeOperate(ctx context.Context, task *openap
if sourceCfg := s.scheduler.GetSourceCfgByID(cfg.SourceName); sourceCfg != nil {
sourceCfgMap[cfg.SourceName] = sourceCfg
} else {
return nil, "", terror.ErrSchedulerSourceCfgNotExist.Generate(sourceCfg.SourceID)
return nil, "", terror.ErrSchedulerSourceCfgNotExist.Generate(cfg.SourceName)
}
}
// generate sub task configs
Expand Down Expand Up @@ -666,6 +671,10 @@ func (s *Server) startTask(ctx context.Context, taskName string, req openapi.Sta
if !ok {
return terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName)
}
// start task check. incremental task need to specify meta or start time
if subTaskCfg.Meta == nil && subTaskCfg.Mode == config.ModeIncrement && req.StartTime == nil {
return terror.ErrConfigMetadataNotSet.Generate(sourceName, config.ModeIncrement)
}
cfg := s.scheduler.GetSourceCfgByID(sourceName)
if cfg == nil {
return terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName)
Expand All @@ -679,10 +688,14 @@ func (s *Server) startTask(ctx context.Context, taskName string, req openapi.Sta
return nil
}

// TODO(ehco) support other start args after https://github.com/pingcap/tiflow/pull/4601 merged
var (
release scheduler.ReleaseFunc
err error
)
// removeMeta
if req.RemoveMeta != nil && *req.RemoveMeta {
// use same latch for remove-meta and start-task
release, err := s.scheduler.AcquireSubtaskLatch(taskName)
release, err = s.scheduler.AcquireSubtaskLatch(taskName)
if err != nil {
return terror.ErrSchedulerLatchInUse.Generate("RemoveMeta", taskName)
}
Expand All @@ -693,8 +706,21 @@ func (s *Server) startTask(ctx context.Context, taskName string, req openapi.Sta
if err != nil {
return terror.Annotate(err, "while removing metadata")
}
}

// handle task cli args
cliArgs, err := config.OpenAPIStartTaskReqToTaskCliArgs(req)
if err != nil {
return terror.Annotate(err, "while converting task command line arguments")
}

if err = handleCliArgs(s.etcdClient, taskName, *req.SourceNameList, cliArgs); err != nil {
return err
}
if release != nil {
release()
}

return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Running, taskName, *req.SourceNameList...)
}

Expand All @@ -705,10 +731,34 @@ func (s *Server) stopTask(ctx context.Context, taskName string, req openapi.Stop
sourceNameList := openapi.SourceNameList(s.getTaskSourceNameList(taskName))
req.SourceNameList = &sourceNameList
}
// TODO(ehco): support stop req after https://github.com/pingcap/tiflow/pull/4601 merged
// handle task cli args
cliArgs, err := config.OpenAPIStopTaskReqToTaskCliArgs(req)
if err != nil {
return terror.Annotate(err, "while converting task command line arguments")
}
if err = handleCliArgs(s.etcdClient, taskName, *req.SourceNameList, cliArgs); err != nil {
return err
}
return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Stopped, taskName, *req.SourceNameList...)
}

// handleCliArgs handles cli args.
// it will try to delete args if cli args is nil.
func handleCliArgs(cli *clientv3.Client, taskName string, sources []string, cliArgs *config.TaskCliArgs) error {
if cliArgs == nil {
err := ha.DeleteTaskCliArgs(cli, taskName, sources)
if err != nil {
return terror.Annotate(err, "while removing task command line arguments")
}
} else {
err := ha.PutTaskCliArgs(cli, taskName, sources, *cliArgs)
if err != nil {
return terror.Annotate(err, "while putting task command line arguments")
}
}
return nil
}

// nolint:unparam
func (s *Server) convertTaskConfig(ctx context.Context, req openapi.ConverterTaskRequest) (*openapi.Task, *config.TaskConfig, error) {
if req.TaskConfigFile != nil {
Expand Down
20 changes: 20 additions & 0 deletions dm/dm/master/openapi_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"context"
"testing"

"github.com/pingcap/tiflow/dm/pkg/ha"

"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/dm/checker"
"github.com/pingcap/tiflow/dm/dm/config"
Expand Down Expand Up @@ -372,6 +374,24 @@ func (s *OpenAPIControllerSuite) TestTaskController() {
// stop success
s.Nil(server.stopTask(ctx, s.testTask.Name, openapi.StopTaskRequest{}))
s.Equal(server.scheduler.GetExpectSubTaskStage(s.testTask.Name, s.testSource.SourceName).Expect, pb.Stage_Stopped)

// start with cli args
startTime := "2022-05-05 12:12:12"
safeModeTimeDuration := "10s"
req = openapi.StartTaskRequest{
StartTime: &startTime,
SafeModeTimeDuration: &safeModeTimeDuration,
}
s.Nil(server.startTask(ctx, s.testTask.Name, req))
taskCliConf, err := ha.GetTaskCliArgs(server.etcdClient, s.testTask.Name, s.testSource.SourceName)
s.Nil(err)
s.NotNil(taskCliConf)
s.Equal(startTime, taskCliConf.StartTime)
s.Equal(safeModeTimeDuration, taskCliConf.SafeModeDuration)

// stop success
s.Nil(server.stopTask(ctx, s.testTask.Name, openapi.StopTaskRequest{}))
s.Equal(server.scheduler.GetExpectSubTaskStage(s.testTask.Name, s.testSource.SourceName).Expect, pb.Stage_Stopped)
}

// delete
Expand Down
2 changes: 1 addition & 1 deletion dm/errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ workaround = "Please check the `target-database` config in task configuration fi
tags = ["internal", "medium"]

[error.DM-config-20022]
message = "mysql-instance(%d) must set meta for task-mode %s"
message = "mysql-instance(%s) must set meta for task-mode %s"
description = ""
workaround = "Please check the `meta` config in task configuration file."
tags = ["internal", "medium"]
Expand Down
Loading

0 comments on commit da4924f

Please sign in to comment.