Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dmctl(dm): add config task-template #3991

Merged
merged 20 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dm/dm/config/task_converters.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func TaskConfigToSubTaskConfigs(c *TaskConfig, sources map[string]DBConfig) ([]*

// OpenAPITaskToSubTaskConfigs generates sub task configs by openapi.Task.
func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCfgMap map[string]*SourceConfig) (
[]SubTaskConfig, error) {
[]*SubTaskConfig, error) {
// source name -> migrate rule list
tableMigrateRuleMap := make(map[string][]openapi.TaskTableMigrateRule)
for _, rule := range task.TableMigrateRule {
Expand All @@ -136,7 +136,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
}
}
// start to generate sub task configs
subTaskCfgList := make([]SubTaskConfig, len(task.SourceConfig.SourceConf))
subTaskCfgList := make([]*SubTaskConfig, len(task.SourceConfig.SourceConf))
for i, sourceCfg := range task.SourceConfig.SourceConf {
// precheck source config
_, exist := sourceCfgMap[sourceCfg.SourceName]
Expand Down Expand Up @@ -245,7 +245,7 @@ func OpenAPITaskToSubTaskConfigs(task *openapi.Task, toDBCfg *DBConfig, sourceCf
if err := subTaskCfg.Adjust(true); err != nil {
return nil, terror.Annotatef(err, "source name %s", sourceCfg.SourceName)
}
subTaskCfgList[i] = *subTaskCfg
subTaskCfgList[i] = subTaskCfg
}
return subTaskCfgList, nil
}
Expand Down
6 changes: 3 additions & 3 deletions dm/dm/config/task_converters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func testNoShardSubTaskConfigsToOpenAPITask(c *check.C) {
// prepare sub task config
subTaskConfigMap := make(map[string]map[string]SubTaskConfig)
subTaskConfigMap[task.Name] = make(map[string]SubTaskConfig)
subTaskConfigMap[task.Name][source1Name] = subTaskConfigList[0]
subTaskConfigMap[task.Name][source1Name] = *subTaskConfigList[0]

taskList := SubTaskConfigsToOpenAPITask(subTaskConfigMap)
c.Assert(taskList, check.HasLen, 1)
Expand Down Expand Up @@ -309,8 +309,8 @@ func testShardAndFilterSubTaskConfigsToOpenAPITask(c *check.C) {
// prepare sub task config
subTaskConfigMap := make(map[string]map[string]SubTaskConfig)
subTaskConfigMap[task.Name] = make(map[string]SubTaskConfig)
subTaskConfigMap[task.Name][source1Name] = subTaskConfigList[0]
subTaskConfigMap[task.Name][source2Name] = subTaskConfigList[1]
subTaskConfigMap[task.Name][source1Name] = *subTaskConfigList[0]
subTaskConfigMap[task.Name][source2Name] = *subTaskConfigList[1]

taskList := SubTaskConfigsToOpenAPITask(subTaskConfigMap)
c.Assert(taskList, check.HasLen, 1)
Expand Down
20 changes: 20 additions & 0 deletions dm/dm/ctl/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,31 @@ func NewConfigCmd() *cobra.Command {
newConfigWorkerCmd(),
newExportCfgsCmd(),
newImportCfgsCmd(),
newConfigTaskTemplateCmd(),
)
cmd.PersistentFlags().StringP("path", "p", "", "specify the file path to export/import`")
return cmd
}

func newConfigTaskTemplateCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "task-template [task-name]",
Short: "show task template which is created by WebUI with task config format",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 || len(args) > 1 {
return cmd.Help()
}
name := args[0]
output, err := cmd.Flags().GetString("path")
if err != nil {
return err
}
return sendGetConfigRequest(pb.CfgType_TaskTemplateType, name, output)
},
}
return cmd
}

func newConfigTaskCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "task [task-name]",
Expand Down
26 changes: 11 additions & 15 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,7 @@ func (s *Server) DMAPIStartTask(c *gin.Context) {
return
}
// check subtask config
subTaskConfigPList := make([]*config.SubTaskConfig, len(subTaskConfigList))
for i := range subTaskConfigList {
subTaskConfigPList[i] = &subTaskConfigList[i]
}
msg, err := checker.CheckSyncConfigFunc(newCtx, subTaskConfigPList,
msg, err := checker.CheckSyncConfigFunc(newCtx, subTaskConfigList,
common.DefaultErrorCnt, common.DefaultWarnCnt)
if err != nil {
_ = c.Error(terror.WithClass(err, terror.ClassDMMaster))
Expand All @@ -456,21 +452,21 @@ func (s *Server) DMAPIStartTask(c *gin.Context) {
log.L().Warn("openapi pre-check warning before start task", zap.String("warning", msg))
}
// specify only start task on partial sources
var needStartSubTaskList []config.SubTaskConfig
var needStartSubTaskList []*config.SubTaskConfig
if req.SourceNameList != nil {
// source name -> sub task config
subTaskCfgM := make(map[string]*config.SubTaskConfig, len(subTaskConfigList))
for idx := range subTaskConfigList {
cfg := subTaskConfigList[idx]
subTaskCfgM[cfg.SourceID] = &cfg
subTaskCfgM[cfg.SourceID] = cfg
}
for _, sourceName := range *req.SourceNameList {
subTaskCfg, ok := subTaskCfgM[sourceName]
if !ok {
_ = c.Error(terror.ErrOpenAPITaskSourceNotFound.Generatef("source name %s", sourceName))
return
}
needStartSubTaskList = append(needStartSubTaskList, *subTaskCfg)
needStartSubTaskList = append(needStartSubTaskList, subTaskCfg)
}
} else {
needStartSubTaskList = subTaskConfigList
Expand All @@ -495,7 +491,7 @@ func (s *Server) DMAPIStartTask(c *gin.Context) {
return
}
}
err = s.scheduler.AddSubTasks(latched, needStartSubTaskList...)
err = s.scheduler.AddSubTasks(latched, subtaskCfgPointersToInstances(needStartSubTaskList...)...)
if err != nil {
_ = c.Error(err)
return
Expand Down Expand Up @@ -788,7 +784,7 @@ func (s *Server) DMAPIOperateTableStructure(c *gin.Context, taskName string, sou
}
}

// DMAPIImportTaskTemplate create task_config_template url is: (POST /api/v1/task/templates/import).
// DMAPIImportTaskTemplate create task_config_template url is: (POST /api/v1/tasks/templates/import).
func (s *Server) DMAPIImportTaskTemplate(c *gin.Context) {
var req openapi.TaskTemplateRequest
if err := c.Bind(&req); err != nil {
Expand Down Expand Up @@ -818,7 +814,7 @@ func (s *Server) DMAPIImportTaskTemplate(c *gin.Context) {
c.IndentedJSON(http.StatusAccepted, resp)
}

// DMAPICreateTaskTemplate create task_config_template url is: (POST /api/task/templates).
// DMAPICreateTaskTemplate create task_config_template url is: (POST /api/tasks/templates).
func (s *Server) DMAPICreateTaskTemplate(c *gin.Context) {
task := &openapi.Task{}
if err := c.Bind(task); err != nil {
Expand All @@ -843,7 +839,7 @@ func (s *Server) DMAPICreateTaskTemplate(c *gin.Context) {
c.IndentedJSON(http.StatusCreated, task)
}

// DMAPIGetTaskTemplateList get task_config_template list url is: (GET /api/v1/task/templates).
// DMAPIGetTaskTemplateList get task_config_template list url is: (GET /api/v1/tasks/templates).
func (s *Server) DMAPIGetTaskTemplateList(c *gin.Context) {
TaskConfigList, err := ha.GetAllOpenAPITaskTemplate(s.etcdClient)
if err != nil {
Expand All @@ -858,7 +854,7 @@ func (s *Server) DMAPIGetTaskTemplateList(c *gin.Context) {
c.IndentedJSON(http.StatusOK, resp)
}

// DMAPIDeleteTaskTemplate delete task_config_template url is: (DELETE /api/v1/task/templates/{task-name}).
// DMAPIDeleteTaskTemplate delete task_config_template url is: (DELETE /api/v1/tasks/templates/{task-name}).
func (s *Server) DMAPIDeleteTaskTemplate(c *gin.Context, taskName string) {
if err := ha.DeleteOpenAPITaskTemplate(s.etcdClient, taskName); err != nil {
_ = c.Error(err)
Expand All @@ -867,7 +863,7 @@ func (s *Server) DMAPIDeleteTaskTemplate(c *gin.Context, taskName string) {
c.Status(http.StatusNoContent)
}

// DMAPIGetTaskTemplate get task_config_template url is: (GET /api/v1/task/templates/{task-name}).
// DMAPIGetTaskTemplate get task_config_template url is: (GET /api/v1/tasks/templates/{task-name}).
func (s *Server) DMAPIGetTaskTemplate(c *gin.Context, taskName string) {
task, err := ha.GetOpenAPITaskTemplate(s.etcdClient, taskName)
if err != nil {
Expand All @@ -881,7 +877,7 @@ func (s *Server) DMAPIGetTaskTemplate(c *gin.Context, taskName string) {
c.IndentedJSON(http.StatusOK, task)
}

// DMAPUpdateTaskTemplate update task_config_template url is: (PUT /api/v1/task/templates/{task-name}).
// DMAPUpdateTaskTemplate update task_config_template url is: (PUT /api/v1/tasks/templates/{task-name}).
func (s *Server) DMAPUpdateTaskTemplate(c *gin.Context, taskName string) {
task := &openapi.Task{}
if err := c.Bind(task); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func (t *openAPISuite) TestTaskAPI(c *check.C) {
c.Assert(resultTaskList.Data[0].Name, check.Equals, task.Name)

// test batch import task config
taskBatchImportURL := "/api/v1/task/templates/import"
taskBatchImportURL := "/api/v1/tasks/templates/import"
req := openapi.TaskTemplateRequest{Overwrite: false}
result = testutil.NewRequest().Post(taskBatchImportURL).WithJsonBody(req).GoWithHTTPHandler(t.testT, s.openapiHandles)
c.Assert(result.Code(), check.Equals, http.StatusAccepted)
Expand Down Expand Up @@ -707,7 +707,7 @@ func (t *openAPISuite) TestTaskTemplatesAPI(c *check.C) {
c.Assert(result.Code(), check.Equals, http.StatusCreated)

// create task config template
url := "/api/v1/task/templates"
url := "/api/v1/tasks/templates"

task, err := fixtures.GenNoShardOpenAPITaskForTest()
c.Assert(err, check.IsNil)
Expand Down
68 changes: 54 additions & 14 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/cputil"
"github.com/pingcap/tiflow/dm/pkg/election"
"github.com/pingcap/tiflow/dm/pkg/etcdutil"
"github.com/pingcap/tiflow/dm/pkg/ha"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
Expand Down Expand Up @@ -2158,29 +2159,68 @@ func (s *Server) GetCfg(ctx context.Context, req *pb.GetCfgRequest) (*pb.GetCfgR
if shouldRet {
return resp2, err2
}
// For the get-config command, you want to filter out fields that are not easily readable by humans,
// such as SSLXXBytes field in `Security` struct

formartAndSortTaskString := func(subCfgList []*config.SubTaskConfig) string {
sort.Slice(subCfgList, func(i, j int) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move this sort inside SubTaskConfigsToTaskConfig

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes ,but we still need sort for get task config, so left sort part in this func

return subCfgList[i].SourceID < subCfgList[j].SourceID
})
// For the get-config command, we want to filter out fields that are not easily readable by humans,
// such as SSLXXBytes field in `Security` struct
taskCfg := config.SubTaskConfigsToTaskConfig(subCfgList...)
taskCfg.TargetDB.Password = "******"
if taskCfg.TargetDB.Security != nil {
taskCfg.TargetDB.Security.ClearSSLBytesData()
}
return taskCfg.String()
}
switch req.Type {
case pb.CfgType_TaskTemplateType:
task, err := ha.GetOpenAPITaskTemplate(s.etcdClient, req.Name)
if err != nil {
resp2.Msg = err.Error()
// nolint:nilerr
return resp2, nil
}
if task == nil {
resp2.Msg = "task not found"
// nolint:nilerr
return resp2, nil
}
toDBCfg := config.GetTargetDBCfgFromOpenAPITask(task)
if adjustDBErr := adjustTargetDB(ctx, toDBCfg); adjustDBErr != nil {
if adjustDBErr != nil {
resp2.Msg = adjustDBErr.Error()
// nolint:nilerr
return resp2, nil
}
}
sourceCfgMap := make(map[string]*config.SourceConfig)
for _, cfg := range task.SourceConfig.SourceConf {
if sourceCfg := s.scheduler.GetSourceCfgByID(cfg.SourceName); sourceCfg != nil {
sourceCfgMap[cfg.SourceName] = sourceCfg
} else {
resp2.Msg = fmt.Sprintf("the source: %s of task not found", cfg.SourceName)
return resp2, nil
}
}
subTaskConfigList, err := config.OpenAPITaskToSubTaskConfigs(task, toDBCfg, sourceCfgMap)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
resp2.Msg = err.Error()
// nolint:nilerr
return resp2, nil
}
cfg = formartAndSortTaskString(subTaskConfigList)
case pb.CfgType_TaskType:
subCfgMap := s.scheduler.GetSubTaskCfgsByTask(req.Name)
if len(subCfgMap) == 0 {
resp2.Msg = "task not found"
return resp2, nil
}
subCfgList := make([]*config.SubTaskConfig, 0, len(subCfgMap))
subTaskConfigList := make([]*config.SubTaskConfig, 0, len(subCfgMap))
for _, subCfg := range subCfgMap {
subCfgList = append(subCfgList, subCfg)
}
sort.Slice(subCfgList, func(i, j int) bool {
return subCfgList[i].SourceID < subCfgList[j].SourceID
})

taskCfg := config.SubTaskConfigsToTaskConfig(subCfgList...)
taskCfg.TargetDB.Password = "******"
if taskCfg.TargetDB.Security != nil {
taskCfg.TargetDB.Security.ClearSSLBytesData()
subTaskConfigList = append(subTaskConfigList, subCfg)
}
cfg = taskCfg.String()
cfg = formartAndSortTaskString(subTaskConfigList)
case pb.CfgType_MasterType:
if req.Name == s.cfg.Name {
cfg, err2 = s.cfg.Toml()
Expand Down
18 changes: 16 additions & 2 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tiflow/dm/dm/master/workerrpc"
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/dm/pbmock"
"github.com/pingcap/tiflow/dm/openapi/fixtures"
"github.com/pingcap/tiflow/dm/pkg/conn"
"github.com/pingcap/tiflow/dm/pkg/cputil"
"github.com/pingcap/tiflow/dm/pkg/etcdutil"
Expand Down Expand Up @@ -2031,16 +2032,29 @@ func (t *testMaster) TestGetCfg(c *check.C) {
c.Assert(resp1.Result, check.IsTrue)
c.Assert(strings.Contains(resp1.Cfg, "name: test"), check.IsTrue)

// wrong task name
// not exist task name
taskName2 := "wrong"
req2 := &pb.GetCfgRequest{
Name: "haha",
Name: taskName2,
Type: pb.CfgType_TaskType,
}
resp2, err := server.GetCfg(context.Background(), req2)
c.Assert(err, check.IsNil)
c.Assert(resp2.Result, check.IsFalse)
c.Assert(resp2.Msg, check.Equals, "task not found")

// generate a template named `wrong`, test get this task template
openapiTask, err := fixtures.GenNoShardOpenAPITaskForTest()
c.Assert(err, check.IsNil)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
openapiTask.Name = taskName2
c.Assert(ha.PutOpenAPITaskTemplate(t.etcdTestCli, openapiTask, true), check.IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB", `return(true)`), check.IsNil)
resp2, err = server.GetCfg(context.Background(), &pb.GetCfgRequest{Name: taskName2, Type: pb.CfgType_TaskTemplateType})
c.Assert(failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB"), check.IsNil)
c.Assert(err, check.IsNil)
c.Assert(resp2.Result, check.IsTrue)
c.Assert(strings.Contains(resp2.Cfg, fmt.Sprintf("name: %s", taskName2)), check.IsTrue)

// test restart master
server.scheduler.Close()
c.Assert(server.scheduler.Start(ctx, t.etcdTestCli), check.IsNil)
Expand Down
Loading