Skip to content

Commit

Permalink
dmctl(dm): add config task-template (#3991)
Browse files Browse the repository at this point in the history
ref #3583
  • Loading branch information
Ehco1996 authored Jan 18, 2022
1 parent d9a3628 commit 4bfa080
Show file tree
Hide file tree
Showing 16 changed files with 780 additions and 509 deletions.
6 changes: 3 additions & 3 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]DBConfig) ([]*

// OpenAPITaskToSubTaskConfigs generates sub task configs by openapi.Task.
func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCfgMap map[string]*SourceConfig) (
[]SubTaskConfig, error) {
[]*SubTaskConfig, error) {
// source name -> migrate rule list
tableMigrateRuleMap := make(map[string][]openapi.TaskTableMigrateRule)
for _, rule := range task.TableMigrateRule {
Expand All @@ -137,7 +137,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
}
}
// start to generate sub task configs
subTaskCfgList := make([]SubTaskConfig, len(task.SourceConfig.SourceConf))
subTaskCfgList := make([]*SubTaskConfig, len(task.SourceConfig.SourceConf))
for i, sourceCfg := range task.SourceConfig.SourceConf {
// precheck source config
_, exist := sourceCfgMap[sourceCfg.SourceName]
Expand Down Expand Up @@ -246,7 +246,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
if err := subTaskCfg.Adjust(true); err != nil {
return nil, terror.Annotatef(err, "source name %s", sourceCfg.SourceName)
}
subTaskCfgList[i] = *subTaskCfg
subTaskCfgList[i] = subTaskCfg
}
return subTaskCfgList, nil
}
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/config/task_converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func testNoShardSubTaskConfigsToOpenAPITask(c *check.C) {
// prepare sub task config
subTaskConfigMap := make(map[string]map[string]SubTaskConfig)
subTaskConfigMap[task.Name] = make(map[string]SubTaskConfig)
subTaskConfigMap[task.Name][source1Name] = subTaskConfigList[0]
subTaskConfigMap[task.Name][source1Name] = *subTaskConfigList[0]

taskList := SubTaskConfigsToOpenAPITask(subTaskConfigMap)
c.Assert(taskList, check.HasLen, 1)
Expand Down Expand Up @@ -309,8 +309,8 @@ func testShardAndFilterSubTaskConfigsToOpenAPITask(c *check.C) {
// prepare sub task config
subTaskConfigMap := make(map[string]map[string]SubTaskConfig)
subTaskConfigMap[task.Name] = make(map[string]SubTaskConfig)
subTaskConfigMap[task.Name][source1Name] = subTaskConfigList[0]
subTaskConfigMap[task.Name][source2Name] = subTaskConfigList[1]
subTaskConfigMap[task.Name][source1Name] = *subTaskConfigList[0]
subTaskConfigMap[task.Name][source2Name] = *subTaskConfigList[1]

taskList := SubTaskConfigsToOpenAPITask(subTaskConfigMap)
c.Assert(taskList, check.HasLen, 1)
Expand Down
20 changes: 20 additions & 0 deletions dm/dm/ctl/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,31 @@ func NewConfigCmd() *cobra.Command {
newConfigWorkerCmd(),
newExportCfgsCmd(),
newImportCfgsCmd(),
newConfigTaskTemplateCmd(),
)
cmd.PersistentFlags().StringP("path", "p", "", "specify the file path to export/import`")
return cmd
}

func newConfigTaskTemplateCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "task-template [task-name]",
Short: "show task template which is created by WebUI with task config format",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 || len(args) > 1 {
return cmd.Help()
}
name := args[0]
output, err := cmd.Flags().GetString("path")
if err != nil {
return err
}
return sendGetConfigRequest(pb.CfgType_TaskTemplateType, name, output)
},
}
return cmd
}

func newConfigTaskCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "task [task-name]",
Expand Down
26 changes: 11 additions & 15 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,7 @@ func (s *Server) DMAPIStartTask(c *gin.Context) {
return
}
// check subtask config
subTaskConfigPList := make([]*config.SubTaskConfig, len(subTaskConfigList))
for i := range subTaskConfigList {
subTaskConfigPList[i] = &subTaskConfigList[i]
}
msg, err := checker.CheckSyncConfigFunc(newCtx, subTaskConfigPList,
msg, err := checker.CheckSyncConfigFunc(newCtx, subTaskConfigList,
common.DefaultErrorCnt, common.DefaultWarnCnt)
if err != nil {
_ = c.Error(terror.WithClass(err, terror.ClassDMMaster))
Expand All @@ -456,21 +452,21 @@ func (s *Server) DMAPIStartTask(c *gin.Context) {
log.L().Warn("openapi pre-check warning before start task", zap.String("warning", msg))
}
// specify only start task on partial sources
var needStartSubTaskList []config.SubTaskConfig
var needStartSubTaskList []*config.SubTaskConfig
if req.SourceNameList != nil {
// source name -> sub task config
subTaskCfgM := make(map[string]*config.SubTaskConfig, len(subTaskConfigList))
for idx := range subTaskConfigList {
cfg := subTaskConfigList[idx]
subTaskCfgM[cfg.SourceID] = &cfg
subTaskCfgM[cfg.SourceID] = cfg
}
for _, sourceName := range *req.SourceNameList {
subTaskCfg, ok := subTaskCfgM[sourceName]
if !ok {
_ = c.Error(terror.ErrOpenAPITaskSourceNotFound.Generatef("source name %s", sourceName))
return
}
needStartSubTaskList = append(needStartSubTaskList, *subTaskCfg)
needStartSubTaskList = append(needStartSubTaskList, subTaskCfg)
}
} else {
needStartSubTaskList = subTaskConfigList
Expand All @@ -495,7 +491,7 @@ func (s *Server) DMAPIStartTask(c *gin.Context) {
return
}
}
err = s.scheduler.AddSubTasks(latched, needStartSubTaskList...)
err = s.scheduler.AddSubTasks(latched, subtaskCfgPointersToInstances(needStartSubTaskList...)...)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -788,7 +784,7 @@ func (s *Server) DMAPIOperateTableStructure(c *gin.Context, taskName string, sou
}
}

// DMAPIImportTaskTemplate create task_config_template url is: (POST /api/v1/task/templates/import).
// DMAPIImportTaskTemplate create task_config_template url is: (POST /api/v1/tasks/templates/import).
func (s *Server) DMAPIImportTaskTemplate(c *gin.Context) {
var req openapi.TaskTemplateRequest
if err := c.Bind(&req); err != nil {
Expand Down Expand Up @@ -818,7 +814,7 @@ func (s *Server) DMAPIImportTaskTemplate(c *gin.Context) {
c.IndentedJSON(http.StatusAccepted, resp)
}

// DMAPICreateTaskTemplate create task_config_template url is: (POST /api/task/templates).
// DMAPICreateTaskTemplate create task_config_template url is: (POST /api/tasks/templates).
func (s *Server) DMAPICreateTaskTemplate(c *gin.Context) {
task := &openapi.Task{}
if err := c.Bind(task); err != nil {
Expand All @@ -843,7 +839,7 @@ func (s *Server) DMAPICreateTaskTemplate(c *gin.Context) {
c.IndentedJSON(http.StatusCreated, task)
}

// DMAPIGetTaskTemplateList get task_config_template list url is: (GET /api/v1/task/templates).
// DMAPIGetTaskTemplateList get task_config_template list url is: (GET /api/v1/tasks/templates).
func (s *Server) DMAPIGetTaskTemplateList(c *gin.Context) {
TaskConfigList, err := ha.GetAllOpenAPITaskTemplate(s.etcdClient)
if err != nil {
Expand All @@ -858,7 +854,7 @@ func (s *Server) DMAPIGetTaskTemplateList(c *gin.Context) {
c.IndentedJSON(http.StatusOK, resp)
}

// DMAPIDeleteTaskTemplate delete task_config_template url is: (DELETE /api/v1/task/templates/{task-name}).
// DMAPIDeleteTaskTemplate delete task_config_template url is: (DELETE /api/v1/tasks/templates/{task-name}).
func (s *Server) DMAPIDeleteTaskTemplate(c *gin.Context, taskName string) {
if err := ha.DeleteOpenAPITaskTemplate(s.etcdClient, taskName); err != nil {
_ = c.Error(err)
Expand All @@ -867,7 +863,7 @@ func (s *Server) DMAPIDeleteTaskTemplate(c *gin.Context, taskName string) {
c.Status(http.StatusNoContent)
}

// DMAPIGetTaskTemplate get task_config_template url is: (GET /api/v1/task/templates/{task-name}).
// DMAPIGetTaskTemplate get task_config_template url is: (GET /api/v1/tasks/templates/{task-name}).
func (s *Server) DMAPIGetTaskTemplate(c *gin.Context, taskName string) {
task, err := ha.GetOpenAPITaskTemplate(s.etcdClient, taskName)
if err != nil {
Expand All @@ -881,7 +877,7 @@ func (s *Server) DMAPIGetTaskTemplate(c *gin.Context, taskName string) {
c.IndentedJSON(http.StatusOK, task)
}

// DMAPUpdateTaskTemplate update task_config_template url is: (PUT /api/v1/task/templates/{task-name}).
// DMAPUpdateTaskTemplate update task_config_template url is: (PUT /api/v1/tasks/templates/{task-name}).
func (s *Server) DMAPUpdateTaskTemplate(c *gin.Context, taskName string) {
task := &openapi.Task{}
if err := c.Bind(task); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func (t *openAPISuite) TestTaskAPI(c *check.C) {
c.Assert(resultTaskList.Data[0].Name, check.Equals, task.Name)

// test batch import task config
taskBatchImportURL := "/api/v1/task/templates/import"
taskBatchImportURL := "/api/v1/tasks/templates/import"
req := openapi.TaskTemplateRequest{Overwrite: false}
result = testutil.NewRequest().Post(taskBatchImportURL).WithJsonBody(req).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusAccepted)
Expand Down Expand Up @@ -707,7 +707,7 @@ func (t *openAPISuite) TestTaskTemplatesAPI(c *check.C) {
c.Assert(result.Code(), check.Equals, http.StatusCreated)

// create task config template
url := "/api/v1/task/templates"
url := "/api/v1/tasks/templates"

task, err := fixtures.GenNoShardOpenAPITaskForTest()
c.Assert(err, check.IsNil)
Expand Down
68 changes: 54 additions & 14 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/cputil"
"github.com/pingcap/tiflow/dm/pkg/election"
"github.com/pingcap/tiflow/dm/pkg/etcdutil"
"github.com/pingcap/tiflow/dm/pkg/ha"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
Expand Down Expand Up @@ -2158,29 +2159,68 @@ func (s *Server) GetCfg(ctx context.Context, req *pb.GetCfgRequest) (*pb.GetCfgR
if shouldRet {
return resp2, err2
}
// For the get-config command, you want to filter out fields that are not easily readable by humans,
// such as SSLXXBytes field in `Security` struct

formartAndSortTaskString := func(subCfgList []*config.SubTaskConfig) string {
sort.Slice(subCfgList, func(i, j int) bool {
return subCfgList[i].SourceID < subCfgList[j].SourceID
})
// For the get-config command, we want to filter out fields that are not easily readable by humans,
// such as SSLXXBytes field in `Security` struct
taskCfg := config.SubTaskConfigsToTaskConfig(subCfgList...)
taskCfg.TargetDB.Password = "******"
if taskCfg.TargetDB.Security != nil {
taskCfg.TargetDB.Security.ClearSSLBytesData()
}
return taskCfg.String()
}
switch req.Type {
case pb.CfgType_TaskTemplateType:
task, err := ha.GetOpenAPITaskTemplate(s.etcdClient, req.Name)
if err != nil {
resp2.Msg = err.Error()
// nolint:nilerr
return resp2, nil
}
if task == nil {
resp2.Msg = "task not found"
// nolint:nilerr
return resp2, nil
}
toDBCfg := config.GetTargetDBCfgFromOpenAPITask(task)
if adjustDBErr := adjustTargetDB(ctx, toDBCfg); adjustDBErr != nil {
if adjustDBErr != nil {
resp2.Msg = adjustDBErr.Error()
// nolint:nilerr
return resp2, nil
}
}
sourceCfgMap := make(map[string]*config.SourceConfig)
for _, cfg := range task.SourceConfig.SourceConf {
if sourceCfg := s.scheduler.GetSourceCfgByID(cfg.SourceName); sourceCfg != nil {
sourceCfgMap[cfg.SourceName] = sourceCfg
} else {
resp2.Msg = fmt.Sprintf("the source: %s of task not found", cfg.SourceName)
return resp2, nil
}
}
subTaskConfigList, err := config.OpenAPITaskToSubTaskConfigs(task, toDBCfg, sourceCfgMap)
if err != nil {
resp2.Msg = err.Error()
// nolint:nilerr
return resp2, nil
}
cfg = formartAndSortTaskString(subTaskConfigList)
case pb.CfgType_TaskType:
subCfgMap := s.scheduler.GetSubTaskCfgsByTask(req.Name)
if len(subCfgMap) == 0 {
resp2.Msg = "task not found"
return resp2, nil
}
subCfgList := make([]*config.SubTaskConfig, 0, len(subCfgMap))
subTaskConfigList := make([]*config.SubTaskConfig, 0, len(subCfgMap))
for _, subCfg := range subCfgMap {
subCfgList = append(subCfgList, subCfg)
}
sort.Slice(subCfgList, func(i, j int) bool {
return subCfgList[i].SourceID < subCfgList[j].SourceID
})

taskCfg := config.SubTaskConfigsToTaskConfig(subCfgList...)
taskCfg.TargetDB.Password = "******"
if taskCfg.TargetDB.Security != nil {
taskCfg.TargetDB.Security.ClearSSLBytesData()
subTaskConfigList = append(subTaskConfigList, subCfg)
}
cfg = taskCfg.String()
cfg = formartAndSortTaskString(subTaskConfigList)
case pb.CfgType_MasterType:
if req.Name == s.cfg.Name {
cfg, err2 = s.cfg.Toml()
Expand Down
18 changes: 16 additions & 2 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tiflow/dm/dm/master/workerrpc"
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/dm/pbmock"
"github.com/pingcap/tiflow/dm/openapi/fixtures"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/dm/pkg/cputil"
"github.com/pingcap/tiflow/dm/pkg/etcdutil"
Expand Down Expand Up @@ -2031,16 +2032,29 @@ func (t *testMaster) TestGetCfg(c *check.C) {
c.Assert(resp1.Result, check.IsTrue)
c.Assert(strings.Contains(resp1.Cfg, "name: test"), check.IsTrue)

// wrong task name
// not exist task name
taskName2 := "wrong"
req2 := &pb.GetCfgRequest{
Name: "haha",
Name: taskName2,
Type: pb.CfgType_TaskType,
}
resp2, err := server.GetCfg(context.Background(), req2)
c.Assert(err, check.IsNil)
c.Assert(resp2.Result, check.IsFalse)
c.Assert(resp2.Msg, check.Equals, "task not found")

// generate a template named `wrong`, test get this task template
openapiTask, err := fixtures.GenNoShardOpenAPITaskForTest()
c.Assert(err, check.IsNil)
openapiTask.Name = taskName2
c.Assert(ha.PutOpenAPITaskTemplate(t.etcdTestCli, openapiTask, true), check.IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB", `return(true)`), check.IsNil)
resp2, err = server.GetCfg(context.Background(), &pb.GetCfgRequest{Name: taskName2, Type: pb.CfgType_TaskTemplateType})
c.Assert(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB"), check.IsNil)
c.Assert(err, check.IsNil)
c.Assert(resp2.Result, check.IsTrue)
c.Assert(strings.Contains(resp2.Cfg, fmt.Sprintf("name: %s", taskName2)), check.IsTrue)

// test restart master
server.scheduler.Close()
c.Assert(server.scheduler.Start(ctx, t.etcdTestCli), check.IsNil)
Expand Down
Loading

0 comments on commit 4bfa080

Please sign in to comment.