diff --git a/dm/chaos/cases/task.go b/dm/chaos/cases/task.go index edecde77fda..c6d39a7a275 100644 --- a/dm/chaos/cases/task.go +++ b/dm/chaos/cases/task.go @@ -183,7 +183,7 @@ func (t *task) run() error { func (t *task) stopPreviousTask() error { t.logger.Info("stopping previous task") resp, err := t.cli.OperateTask(t.ctx, &pb.OperateTaskRequest{ - Op: pb.TaskOp_Stop, + Op: pb.TaskOp_Delete, Name: t.taskCfg.Name, }) if err != nil { diff --git a/dm/dm/ctl/master/stop_task.go b/dm/dm/ctl/master/stop_task.go index 1edd46b93af..a35484668f7 100644 --- a/dm/dm/ctl/master/stop_task.go +++ b/dm/dm/ctl/master/stop_task.go @@ -32,5 +32,5 @@ func NewStopTaskCmd() *cobra.Command { // stopTaskFunc does stop task request. func stopTaskFunc(cmd *cobra.Command, _ []string) (err error) { - return operateTaskFunc(pb.TaskOp_Stop, cmd) + return operateTaskFunc(pb.TaskOp_Delete, cmd) } diff --git a/dm/dm/master/bootstrap.go b/dm/dm/master/bootstrap.go index 9c090843387..f7fa2e310d7 100644 --- a/dm/dm/master/bootstrap.go +++ b/dm/dm/master/bootstrap.go @@ -350,12 +350,6 @@ func (s *Server) upgradeDBSchemaV1Import(tctx *tcontext.Context, cfgs map[string } // createSubtaskV1Import tries to create subtasks with the specified stage. -// NOTE: now we only have two different APIs to: -// - create a new (running) subtask. -// - update the subtask to the specified stage. -// in other words, if we want to create a `Paused` task, -// we need to create a `Running` one first and then `Pause` it. -// this is not a big problem now, but if needed we can refine it later. // NOTE: we do not stopping previous subtasks if any later one failed (because some side effects may have taken), // and let the user to check & fix the problem. // TODO(csuzhangxc): merge subtask configs to support `get-task-config`. @@ -377,8 +371,7 @@ outerLoop: tctx.Logger.Warn("skip to create subtask because only support to create subtasks with Running/Paused stage now", zap.Stringer("stage", stage)) continue } - // create and update subtasks one by one (this should be quick enough because only updating etcd). - err = s.scheduler.AddSubTasks(false, *cfg2) + err = s.scheduler.AddSubTasks(false, stage, *cfg2) if err != nil { if terror.ErrSchedulerSubTaskExist.Equal(err) { err = nil // reset error @@ -387,12 +380,6 @@ outerLoop: break outerLoop } } - if stage == pb.Stage_Paused { // no more operation needed for `Running`. - err = s.scheduler.UpdateExpectSubTaskStage(stage, taskName, sourceID) - if err != nil { - break outerLoop - } - } } } return err diff --git a/dm/dm/master/openapi_controller.go b/dm/dm/master/openapi_controller.go new file mode 100644 index 00000000000..1b9dc03a349 --- /dev/null +++ b/dm/dm/master/openapi_controller.go @@ -0,0 +1,284 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. +// +// MVC for dm-master's openapi server +// Model(data in etcd): source of truth +// View(openapi_view): do some inner work such as validate, filter, prepare parameters/response and call controller to update model. +// Controller(openapi_controller): call model func to update data. + +package master + +import ( + "context" + + "github.com/pingcap/tiflow/dm/checker" + "github.com/pingcap/tiflow/dm/dm/config" + "github.com/pingcap/tiflow/dm/dm/pb" + "github.com/pingcap/tiflow/dm/openapi" + "github.com/pingcap/tiflow/dm/pkg/terror" +) + +func (s *Server) getSourceStatusListFromWorker(ctx context.Context, sourceName string, specifiedSource bool) ([]openapi.SourceStatus, error) { + workerStatusList := s.getStatusFromWorkers(ctx, []string{sourceName}, "", specifiedSource) + sourceStatusList := make([]openapi.SourceStatus, len(workerStatusList)) + for i, workerStatus := range workerStatusList { + if workerStatus == nil { + // this should not happen unless the rpc in the worker server has been modified + return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil") + } + sourceStatus := openapi.SourceStatus{SourceName: sourceName, WorkerName: workerStatus.SourceStatus.Worker} + if !workerStatus.Result { + sourceStatus.ErrorMsg = &workerStatus.Msg + } else if relayStatus := workerStatus.SourceStatus.GetRelayStatus(); relayStatus != nil { + sourceStatus.RelayStatus = &openapi.RelayStatus{ + MasterBinlog: relayStatus.MasterBinlog, + MasterBinlogGtid: relayStatus.MasterBinlogGtid, + RelayBinlogGtid: relayStatus.RelayBinlogGtid, + RelayCatchUpMaster: relayStatus.RelayCatchUpMaster, + RelayDir: relayStatus.RelaySubDir, + Stage: relayStatus.Stage.String(), + } + } + sourceStatusList[i] = sourceStatus + } + return sourceStatusList, nil +} + +// nolint:unparam +func (s *Server) createSource(ctx context.Context, cfg *config.SourceConfig) error { + return s.scheduler.AddSourceCfg(cfg) +} + +// nolint:unparam,unused +func (s *Server) updateSource(ctx context.Context, cfg *config.SourceConfig) error { + // TODO(ehco) no caller now , will implement later + return nil +} + +// nolint:unparam +func (s *Server) deleteSource(ctx context.Context, sourceName string, force bool) error { + if force { + for _, taskName := range s.scheduler.GetTaskNameListBySourceName(sourceName) { + if err := s.scheduler.RemoveSubTasks(taskName, sourceName); err != nil { + return err + } + } + } + return s.scheduler.RemoveSourceCfg(sourceName) +} + +// nolint:unparam,unused +func (s *Server) getSource(ctx context.Context, sourceName string) (openapiSource openapi.Source, err error) { + sourceCfg := s.scheduler.GetSourceCfgByID(sourceName) + if sourceCfg == nil { + return openapiSource, terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName) + } + openapiSource = config.SourceCfgToOpenAPISource(sourceCfg) + return openapiSource, nil +} + +func (s *Server) getSourceStatus(ctx context.Context, sourceName string) ([]openapi.SourceStatus, error) { + return s.getSourceStatusListFromWorker(ctx, sourceName, true) +} + +// nolint:unparam +func (s *Server) listSource(ctx context.Context, req interface{}) []openapi.Source { + // TODO(ehco) implement filter later + sourceM := s.scheduler.GetSourceCfgs() + openapiSourceList := make([]openapi.Source, 0, len(sourceM)) + for _, source := range sourceM { + openapiSourceList = append(openapiSourceList, config.SourceCfgToOpenAPISource(source)) + } + return openapiSourceList +} + +// nolint:unparam,unused +func (s *Server) enableSource(ctx context.Context, sourceName, workerName string) error { + worker := s.scheduler.GetWorkerBySource(sourceName) + if worker == nil { + return terror.ErrWorkerNoStart + } + taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName) + return s.scheduler.BatchOperateTaskOnWorker(ctx, worker, taskNameList, sourceName, pb.Stage_Running, true) +} + +// nolint:unused +func (s *Server) disableSource(ctx context.Context, sourceName string) error { + worker := s.scheduler.GetWorkerBySource(sourceName) + if worker == nil { + // no need to stop task if the source is not running + return nil + } + taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName) + return s.scheduler.BatchOperateTaskOnWorker(ctx, worker, taskNameList, sourceName, pb.Stage_Stopped, true) +} + +func (s *Server) transferSource(ctx context.Context, sourceName, workerName string) error { + return s.scheduler.TransferSource(ctx, sourceName, workerName) +} + +func (s *Server) checkTask(ctx context.Context, subtaskCfgList []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error) { + return checker.CheckSyncConfigFunc(ctx, subtaskCfgList, errCnt, warnCnt) +} + +// nolint:unparam,unused +func (s *Server) createTask(ctx context.Context, subtaskCfgList []*config.SubTaskConfig) error { + return s.scheduler.AddSubTasks(false, pb.Stage_Stopped, subtaskCfgPointersToInstances(subtaskCfgList...)...) +} + +// nolint:unused +func (s *Server) updateTask(ctx context.Context, taskCfg *config.TaskConfig) error { + // TODO(ehco) no caller now , will implement later + return nil +} + +// nolint:unparam,unused +func (s *Server) deleteTask(ctx context.Context, taskName string) error { + sourceNameList := s.getTaskSourceNameList(taskName) + return s.scheduler.RemoveSubTasks(taskName, sourceNameList...) +} + +// nolint:unused +func (s *Server) getTask(ctx context.Context, taskName string) error { + // TODO(ehco) no caller now , will implement later + return nil +} + +func (s *Server) getTaskStatus(ctx context.Context, taskName string, sourceNameList []string) ([]openapi.SubTaskStatus, error) { + workerStatusList := s.getStatusFromWorkers(ctx, sourceNameList, taskName, true) + subTaskStatusList := make([]openapi.SubTaskStatus, 0, len(workerStatusList)) + for _, workerStatus := range workerStatusList { + if workerStatus == nil || workerStatus.SourceStatus == nil { + // this should not happen unless the rpc in the worker server has been modified + return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil") + } + sourceStatus := workerStatus.SourceStatus + openapiSubTaskStatus := openapi.SubTaskStatus{ + Name: taskName, + SourceName: sourceStatus.GetSource(), + WorkerName: sourceStatus.GetWorker(), + } + if !workerStatus.Result { + openapiSubTaskStatus.ErrorMsg = &workerStatus.Msg + subTaskStatusList = append(subTaskStatusList, openapiSubTaskStatus) + continue + } + if len(workerStatus.SubTaskStatus) == 0 { + // this should not happen unless the rpc in the worker server has been modified + return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil") + } + subTaskStatus := workerStatus.SubTaskStatus[0] + if subTaskStatus == nil { + // this should not happen unless the rpc in the worker server has been modified + return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil") + } + openapiSubTaskStatus.Stage = subTaskStatus.GetStage().String() + openapiSubTaskStatus.Unit = subTaskStatus.GetUnit().String() + openapiSubTaskStatus.UnresolvedDdlLockId = &subTaskStatus.UnresolvedDDLLockID + // add load status + if loadS := subTaskStatus.GetLoad(); loadS != nil { + openapiSubTaskStatus.LoadStatus = &openapi.LoadStatus{ + FinishedBytes: loadS.FinishedBytes, + MetaBinlog: loadS.MetaBinlog, + MetaBinlogGtid: loadS.MetaBinlogGTID, + Progress: loadS.Progress, + TotalBytes: loadS.TotalBytes, + } + } + // add sync status + if syncerS := subTaskStatus.GetSync(); syncerS != nil { + openapiSubTaskStatus.SyncStatus = &openapi.SyncStatus{ + BinlogType: syncerS.GetBinlogType(), + BlockingDdls: syncerS.GetBlockingDDLs(), + MasterBinlog: syncerS.GetMasterBinlog(), + MasterBinlogGtid: syncerS.GetMasterBinlogGtid(), + RecentTps: syncerS.RecentTps, + SecondsBehindMaster: syncerS.SecondsBehindMaster, + Synced: syncerS.Synced, + SyncerBinlog: syncerS.SyncerBinlog, + SyncerBinlogGtid: syncerS.SyncerBinlogGtid, + TotalEvents: syncerS.TotalEvents, + TotalTps: syncerS.TotalTps, + } + if unResolvedGroups := syncerS.GetUnresolvedGroups(); len(unResolvedGroups) > 0 { + openapiSubTaskStatus.SyncStatus.UnresolvedGroups = make([]openapi.ShardingGroup, len(unResolvedGroups)) + for i, unResolvedGroup := range unResolvedGroups { + openapiSubTaskStatus.SyncStatus.UnresolvedGroups[i] = openapi.ShardingGroup{ + DdlList: unResolvedGroup.DDLs, + FirstLocation: unResolvedGroup.FirstLocation, + Synced: unResolvedGroup.Synced, + Target: unResolvedGroup.Target, + Unsynced: unResolvedGroup.Unsynced, + } + } + } + } + // add dump status + if dumpS := subTaskStatus.GetDump(); dumpS != nil { + openapiSubTaskStatus.DumpStatus = &openapi.DumpStatus{ + CompletedTables: dumpS.CompletedTables, + EstimateTotalRows: dumpS.EstimateTotalRows, + FinishedBytes: dumpS.FinishedBytes, + FinishedRows: dumpS.FinishedRows, + TotalTables: dumpS.TotalTables, + } + } + subTaskStatusList = append(subTaskStatusList, openapiSubTaskStatus) + } + return subTaskStatusList, nil +} + +// nolint:unparam,unused +func (s *Server) listTask(ctx context.Context, req interface{}) []openapi.Task { + // TODO(ehco) implement filter later + subTaskConfigMap := s.scheduler.GetSubTaskCfgs() + return config.SubTaskConfigsToOpenAPITask(subTaskConfigMap) +} + +// nolint:unparam,unused +func (s *Server) startTask(ctx context.Context, taskName string, sourceNameList []string, removeMeta bool, req interface{}) error { + // TODO(ehco) merge start-task req + subTaskConfigM := s.scheduler.GetSubTaskCfgsByTask(taskName) + needStartSubTaskList := make([]*config.SubTaskConfig, 0, len(subTaskConfigM)) + for _, sourceName := range sourceNameList { + subTaskCfg, ok := subTaskConfigM[sourceName] + if !ok { + return terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName) + } + needStartSubTaskList = append(needStartSubTaskList, subTaskCfg) + } + if len(needStartSubTaskList) == 0 { + return nil + } + if removeMeta { + // use same latch for remove-meta and start-task + release, err := s.scheduler.AcquireSubtaskLatch(taskName) + if err != nil { + return terror.ErrSchedulerLatchInUse.Generate("RemoveMeta", taskName) + } + defer release() + metaSchema := needStartSubTaskList[0].MetaSchema + targetDB := needStartSubTaskList[0].To + err = s.removeMetaData(ctx, taskName, metaSchema, &targetDB) + if err != nil { + return terror.Annotate(err, "while removing metadata") + } + release() + } + return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Running, taskName, sourceNameList...) +} + +// nolint:unparam,unused +func (s *Server) stopTask(ctx context.Context, taskName string, sourceNameList []string) error { + return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Stopped, taskName, sourceNameList...) +} diff --git a/dm/dm/master/openapi.go b/dm/dm/master/openapi_view.go similarity index 72% rename from dm/dm/master/openapi.go rename to dm/dm/master/openapi_view.go index 0581bda261d..efb9f2272bf 100644 --- a/dm/dm/master/openapi.go +++ b/dm/dm/master/openapi_view.go @@ -16,7 +16,6 @@ package master import ( - "context" "encoding/json" "fmt" "net/http" @@ -25,10 +24,8 @@ import ( "github.com/gin-gonic/gin" "go.uber.org/zap" - "github.com/pingcap/tiflow/dm/checker" "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/dm/ctl/common" - "github.com/pingcap/tiflow/dm/dm/master/scheduler" "github.com/pingcap/tiflow/dm/dm/master/workerrpc" "github.com/pingcap/tiflow/dm/dm/pb" "github.com/pingcap/tiflow/dm/openapi" @@ -72,7 +69,7 @@ func (s *Server) InitOpenAPIHandles() error { } // disables swagger server name validation. it seems to work poorly swagger.Servers = nil - + gin.SetMode(gin.ReleaseMode) r := gin.New() // middlewares r.Use(gin.Recovery()) @@ -84,7 +81,6 @@ func (s *Server) InitOpenAPIHandles() error { // register handlers openapi.RegisterHandlers(r, s) s.openapiHandles = r - gin.SetMode(gin.ReleaseMode) return nil } @@ -178,11 +174,14 @@ func (s *Server) DMAPICreateSource(c *gin.Context) { return } cfg := config.OpenAPISourceToSourceCfg(createSourceReq) - if err := checkAndAdjustSourceConfigFunc(c.Request.Context(), cfg); err != nil { + + ctx := c.Request.Context() + if err := checkAndAdjustSourceConfigFunc(ctx, cfg); err != nil { _ = c.Error(err) return } - if err := s.scheduler.AddSourceCfg(cfg); err != nil { + // TODO support specify worker name + if err := s.createSource(ctx, cfg); err != nil { _ = c.Error(err) return } @@ -191,16 +190,13 @@ func (s *Server) DMAPICreateSource(c *gin.Context) { // DMAPIGetSourceList url is:(GET /api/v1/sources). func (s *Server) DMAPIGetSourceList(c *gin.Context, params openapi.DMAPIGetSourceListParams) { - sourceMap := s.scheduler.GetSourceCfgs() - sourceList := []openapi.Source{} - for key := range sourceMap { - sourceList = append(sourceList, config.SourceCfgToOpenAPISource((sourceMap[key]))) - } + ctx := c.Request.Context() + // todo support filter + sourceList := s.listSource(ctx, nil) // fill status if params.WithStatus != nil && *params.WithStatus { - nexCtx := c.Request.Context() for idx := range sourceList { - sourceStatusList, err := s.getSourceStatusListFromWorker(nexCtx, sourceList[idx].SourceName, true) + sourceStatusList, err := s.getSourceStatus(ctx, sourceList[idx].SourceName) if err != nil { _ = c.Error(err) return @@ -214,22 +210,13 @@ func (s *Server) DMAPIGetSourceList(c *gin.Context, params openapi.DMAPIGetSourc // DMAPIDeleteSource url is:(DELETE /api/v1/sources). func (s *Server) DMAPIDeleteSource(c *gin.Context, sourceName string, params openapi.DMAPIDeleteSourceParams) { + ctx := c.Request.Context() + var force bool // force means delete source and stop all task of this source if params.Force != nil && *params.Force { - // TODO(ehco) stop task concurrently - newCtx := c.Request.Context() - for _, taskName := range s.scheduler.GetTaskNameListBySourceName(sourceName) { - if _, err := s.OperateTask(newCtx, &pb.OperateTaskRequest{ - Op: pb.TaskOp_Stop, - Name: taskName, - Sources: []string{sourceName}, - }); err != nil { - _ = c.Error(terror.ErrOpenAPICommonError.Delegate(err, "failed to stop source related task %s", taskName)) - return - } - } + force = *params.Force } - if err := s.scheduler.RemoveSourceCfg(sourceName); err != nil { + if err := s.deleteSource(ctx, sourceName, force); err != nil { _ = c.Error(err) return } @@ -341,32 +328,6 @@ func (s *Server) DMAPIGetSourceTableList(c *gin.Context, sourceName string, sche c.IndentedJSON(http.StatusOK, tableList) } -func (s *Server) getSourceStatusListFromWorker(ctx context.Context, sourceName string, specifiedSource bool) ([]openapi.SourceStatus, error) { - workerStatusList := s.getStatusFromWorkers(ctx, []string{sourceName}, "", specifiedSource) - sourceStatusList := make([]openapi.SourceStatus, len(workerStatusList)) - for i, workerStatus := range workerStatusList { - if workerStatus == nil { - // this should not happen unless the rpc in the worker server has been modified - return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil") - } - sourceStatus := openapi.SourceStatus{SourceName: sourceName, WorkerName: workerStatus.SourceStatus.Worker} - if !workerStatus.Result { - sourceStatus.ErrorMsg = &workerStatus.Msg - } else if relayStatus := workerStatus.SourceStatus.GetRelayStatus(); relayStatus != nil { - sourceStatus.RelayStatus = &openapi.RelayStatus{ - MasterBinlog: relayStatus.MasterBinlog, - MasterBinlogGtid: relayStatus.MasterBinlogGtid, - RelayBinlogGtid: relayStatus.RelayBinlogGtid, - RelayCatchUpMaster: relayStatus.RelayCatchUpMaster, - RelayDir: relayStatus.RelaySubDir, - Stage: relayStatus.Stage.String(), - } - } - sourceStatusList[i] = sourceStatus - } - return sourceStatusList, nil -} - // DMAPIGetSourceStatus url is: (GET /api/v1/sources/{source-id}/status). func (s *Server) DMAPIGetSourceStatus(c *gin.Context, sourceName string) { sourceCfg := s.scheduler.GetSourceCfgByID(sourceName) @@ -383,7 +344,7 @@ func (s *Server) DMAPIGetSourceStatus(c *gin.Context, sourceName string) { c.IndentedJSON(http.StatusOK, resp) return } - sourceStatusList, err := s.getSourceStatusListFromWorker(c.Request.Context(), sourceName, true) + sourceStatusList, err := s.getSourceStatus(c.Request.Context(), sourceName) if err != nil { _ = c.Error(err) return @@ -400,7 +361,7 @@ func (s *Server) DMAPITransferSource(c *gin.Context, sourceName string) { _ = c.Error(err) return } - if err := s.scheduler.TransferSource(c.Request.Context(), sourceName, req.WorkerName); err != nil { + if err := s.transferSource(c.Request.Context(), sourceName, req.WorkerName); err != nil { _ = c.Error(err) } } @@ -441,8 +402,7 @@ func (s *Server) DMAPIStartTask(c *gin.Context) { return } // check subtask config - msg, err := checker.CheckSyncConfigFunc(newCtx, subTaskConfigList, - common.DefaultErrorCnt, common.DefaultWarnCnt) + msg, err := s.checkTask(newCtx, subTaskConfigList, common.DefaultErrorCnt, common.DefaultWarnCnt) if err != nil { _ = c.Error(terror.WithClass(err, terror.ClassDMMaster)) return @@ -451,70 +411,44 @@ func (s *Server) DMAPIStartTask(c *gin.Context) { // TODO: return warning msg with http.StatusCreated and task together log.L().Warn("openapi pre-check warning before start task", zap.String("warning", msg)) } + + if createErr := s.createTask(newCtx, subTaskConfigList); createErr != nil { + _ = c.Error(terror.WithClass(createErr, terror.ClassDMMaster)) + return + } + var sourceNameList []string // specify only start task on partial sources - var needStartSubTaskList []*config.SubTaskConfig if req.SourceNameList != nil { - // source name -> sub task config - subTaskCfgM := make(map[string]*config.SubTaskConfig, len(subTaskConfigList)) - for idx := range subTaskConfigList { - cfg := subTaskConfigList[idx] - subTaskCfgM[cfg.SourceID] = cfg - } - for _, sourceName := range *req.SourceNameList { - subTaskCfg, ok := subTaskCfgM[sourceName] - if !ok { - _ = c.Error(terror.ErrOpenAPITaskSourceNotFound.Generatef("source name %s", sourceName)) - return - } - needStartSubTaskList = append(needStartSubTaskList, subTaskCfg) - } + sourceNameList = *req.SourceNameList } else { - needStartSubTaskList = subTaskConfigList - } - // end all pre-check, start to create task - var ( - latched = false - release scheduler.ReleaseFunc - ) - if req.RemoveMeta { - // use same latch for remove-meta and start-task - release, err = s.scheduler.AcquireSubtaskLatch(task.Name) - if err != nil { - _ = c.Error(terror.ErrSchedulerLatchInUse.Generate("RemoveMeta", task.Name)) - return - } - defer release() - latched = true - err = s.removeMetaData(newCtx, task.Name, *task.MetaSchema, toDBCfg) - if err != nil { - _ = c.Error(terror.Annotate(err, "while removing metadata")) - return - } + sourceNameList = s.getTaskSourceNameList(task.Name) } - err = s.scheduler.AddSubTasks(latched, subtaskCfgPointersToInstances(needStartSubTaskList...)...) - if err != nil { - _ = c.Error(err) + if startErr := s.startTask(newCtx, task.Name, sourceNameList, req.RemoveMeta, nil); startErr != nil { + _ = c.Error(terror.WithClass(startErr, terror.ClassDMMaster)) return } - if release != nil { - release() - } c.IndentedJSON(http.StatusCreated, task) } // DMAPIDeleteTask url is:(DELETE /api/v1/tasks). func (s *Server) DMAPIDeleteTask(c *gin.Context, taskName string, params openapi.DMAPIDeleteTaskParams) { - var sourceList []string + var sourceNameList []string if params.SourceNameList != nil { - sourceList = *params.SourceNameList + sourceNameList = *params.SourceNameList } else { - sourceList = s.getTaskResources(taskName) + sourceNameList = s.getTaskSourceNameList(taskName) } - if len(sourceList) == 0 { + if len(sourceNameList) == 0 { _ = c.Error(terror.ErrSchedulerTaskNotExist.Generate(taskName)) return } - if err := s.scheduler.RemoveSubTasks(taskName, sourceList...); err != nil { + + ctx := c.Request.Context() + if err := s.stopTask(ctx, taskName, sourceNameList); err != nil { + _ = c.Error(err) + return + } + if err := s.deleteTask(ctx, taskName); err != nil { _ = c.Error(err) return } @@ -523,25 +457,13 @@ func (s *Server) DMAPIDeleteTask(c *gin.Context, taskName string, params openapi // DMAPIGetTaskList url is:(GET /api/v1/tasks). func (s *Server) DMAPIGetTaskList(c *gin.Context, params openapi.DMAPIGetTaskListParams) { - // get sub task config by task name task name->source name->subtask config - subTaskConfigMap := s.scheduler.GetSubTaskCfgs() - taskList := config.SubTaskConfigsToOpenAPITask(subTaskConfigMap) - + ctx := c.Request.Context() + taskList := s.listTask(ctx, nil) // fill status if params.WithStatus != nil && *params.WithStatus { - // get source list for all task - sourceNameM := make(map[string]struct{}) // use map to avoid duplicate source name - for _, task := range taskList { - for _, sourceConf := range task.SourceConfig.SourceConf { - sourceNameM[sourceConf.SourceName] = struct{}{} - } - } - sourceList := utils.SetToSlice(sourceNameM) - // get status from workers - workerStatusList := s.getStatusFromWorkers(c.Request.Context(), sourceList, "", false) // fill status for every task for idx := range taskList { - subTaskStatusList, err := getOpenAPISubtaskStatusByTaskName(taskList[idx].Name, workerStatusList) + subTaskStatusList, err := s.getTaskStatus(ctx, taskList[idx].Name, s.getTaskSourceNameList(taskList[idx].Name)) if err != nil { _ = c.Error(err) return @@ -555,24 +477,19 @@ func (s *Server) DMAPIGetTaskList(c *gin.Context, params openapi.DMAPIGetTaskLis // DMAPIGetTaskStatus url is:(GET /api/v1/tasks/{task-name}/status). func (s *Server) DMAPIGetTaskStatus(c *gin.Context, taskName string, params openapi.DMAPIGetTaskStatusParams) { - // 1. get task source list from scheduler - var ( - sourceList []string - specifiedSource bool - ) - if params.SourceNameList == nil { - sourceList = s.getTaskResources(taskName) + var sourceNameList []string + // specify only start task on partial sources + if params.SourceNameList != nil { + sourceNameList = *params.SourceNameList } else { - sourceList = *params.SourceNameList - specifiedSource = true + sourceNameList = s.getTaskSourceNameList(taskName) } - if len(sourceList) == 0 { + if len(sourceNameList) == 0 { _ = c.Error(terror.ErrSchedulerTaskNotExist.Generate(taskName)) return } // 2. get status from workers - workerStatusList := s.getStatusFromWorkers(c.Request.Context(), sourceList, taskName, specifiedSource) - subTaskStatusList, err := getOpenAPISubtaskStatusByTaskName(taskName, workerStatusList) + subTaskStatusList, err := s.getTaskStatus(c.Request.Context(), taskName, sourceNameList) if err != nil { _ = c.Error(err) return @@ -583,30 +500,32 @@ func (s *Server) DMAPIGetTaskStatus(c *gin.Context, taskName string, params open // DMAPIPauseTask pause task url is: (POST /api/v1/tasks/{task-name}/pause). func (s *Server) DMAPIPauseTask(c *gin.Context, taskName string) { - var sourceName openapi.SchemaNameList - if err := c.Bind(&sourceName); err != nil { + var sourceNameList openapi.SchemaNameList + if err := c.Bind(&sourceNameList); err != nil { _ = c.Error(err) return } - if len(sourceName) == 0 { - sourceName = s.getTaskResources(taskName) + if len(sourceNameList) == 0 { + sourceNameList = s.getTaskSourceNameList(taskName) } - if err := s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Paused, taskName, sourceName...); err != nil { + ctx := c.Request.Context() + if err := s.stopTask(ctx, taskName, sourceNameList); err != nil { _ = c.Error(err) } } // DMAPIResumeTask resume task url is: (POST /api/v1/tasks/{task-name}/resume). func (s *Server) DMAPIResumeTask(c *gin.Context, taskName string) { - var sourceName openapi.SchemaNameList - if err := c.Bind(&sourceName); err != nil { + var sourceNameList openapi.SchemaNameList + if err := c.Bind(&sourceNameList); err != nil { _ = c.Error(err) return } - if len(sourceName) == 0 { - sourceName = s.getTaskResources(taskName) + if len(sourceNameList) == 0 { + sourceNameList = s.getTaskSourceNameList(taskName) } - if err := s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Running, taskName, sourceName...); err != nil { + ctx := c.Request.Context() + if err := s.startTask(ctx, taskName, sourceNameList, false, nil); err != nil { _ = c.Error(err) } } @@ -926,89 +845,3 @@ func terrorHTTPErrorHandler() gin.HandlerFunc { c.IndentedJSON(http.StatusBadRequest, openapi.ErrorWithMessage{ErrorMsg: msg, ErrorCode: code}) } } - -func getOpenAPISubtaskStatusByTaskName(taskName string, - workerStatusList []*pb.QueryStatusResponse) ([]openapi.SubTaskStatus, error) { - subTaskStatusList := make([]openapi.SubTaskStatus, 0, len(workerStatusList)) - for _, workerStatus := range workerStatusList { - if workerStatus == nil || workerStatus.SourceStatus == nil { - // this should not happen unless the rpc in the worker server has been modified - return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil") - } - sourceStatus := workerStatus.SourceStatus - openapiSubTaskStatus := openapi.SubTaskStatus{ - Name: taskName, - SourceName: sourceStatus.GetSource(), - WorkerName: sourceStatus.GetWorker(), - } - if !workerStatus.Result { - openapiSubTaskStatus.ErrorMsg = &workerStatus.Msg - subTaskStatusList = append(subTaskStatusList, openapiSubTaskStatus) - continue - } - // find right task name - var subTaskStatus *pb.SubTaskStatus - for _, cfg := range workerStatus.SubTaskStatus { - if cfg.Name == taskName { - subTaskStatus = cfg - } - } - if subTaskStatus == nil { - // not find - continue - } - openapiSubTaskStatus.Stage = subTaskStatus.GetStage().String() - openapiSubTaskStatus.Unit = subTaskStatus.GetUnit().String() - openapiSubTaskStatus.UnresolvedDdlLockId = &subTaskStatus.UnresolvedDDLLockID - // add load status - if loadS := subTaskStatus.GetLoad(); loadS != nil { - openapiSubTaskStatus.LoadStatus = &openapi.LoadStatus{ - FinishedBytes: loadS.FinishedBytes, - MetaBinlog: loadS.MetaBinlog, - MetaBinlogGtid: loadS.MetaBinlogGTID, - Progress: loadS.Progress, - TotalBytes: loadS.TotalBytes, - } - } - // add sync status - if syncerS := subTaskStatus.GetSync(); syncerS != nil { - openapiSubTaskStatus.SyncStatus = &openapi.SyncStatus{ - BinlogType: syncerS.GetBinlogType(), - BlockingDdls: syncerS.GetBlockingDDLs(), - MasterBinlog: syncerS.GetMasterBinlog(), - MasterBinlogGtid: syncerS.GetMasterBinlogGtid(), - RecentTps: syncerS.RecentTps, - SecondsBehindMaster: syncerS.SecondsBehindMaster, - Synced: syncerS.Synced, - SyncerBinlog: syncerS.SyncerBinlog, - SyncerBinlogGtid: syncerS.SyncerBinlogGtid, - TotalEvents: syncerS.TotalEvents, - TotalTps: syncerS.TotalTps, - } - if unResolvedGroups := syncerS.GetUnresolvedGroups(); len(unResolvedGroups) > 0 { - openapiSubTaskStatus.SyncStatus.UnresolvedGroups = make([]openapi.ShardingGroup, len(unResolvedGroups)) - for i, unResolvedGroup := range unResolvedGroups { - openapiSubTaskStatus.SyncStatus.UnresolvedGroups[i] = openapi.ShardingGroup{ - DdlList: unResolvedGroup.DDLs, - FirstLocation: unResolvedGroup.FirstLocation, - Synced: unResolvedGroup.Synced, - Target: unResolvedGroup.Target, - Unsynced: unResolvedGroup.Unsynced, - } - } - } - } - // add dump status - if dumpS := subTaskStatus.GetDump(); dumpS != nil { - openapiSubTaskStatus.DumpStatus = &openapi.DumpStatus{ - CompletedTables: dumpS.CompletedTables, - EstimateTotalRows: dumpS.EstimateTotalRows, - FinishedBytes: dumpS.FinishedBytes, - FinishedRows: dumpS.FinishedRows, - TotalTables: dumpS.TotalTables, - } - } - subTaskStatusList = append(subTaskStatusList, openapiSubTaskStatus) - } - return subTaskStatusList, nil -} diff --git a/dm/dm/master/openapi_test.go b/dm/dm/master/openapi_view_test.go similarity index 97% rename from dm/dm/master/openapi_test.go rename to dm/dm/master/openapi_view_test.go index 453fee955c1..4ca476ba39e 100644 --- a/dm/dm/master/openapi_test.go +++ b/dm/dm/master/openapi_view_test.go @@ -27,6 +27,7 @@ import ( "github.com/golang/mock/gomock" "github.com/pingcap/check" "github.com/pingcap/failpoint" + "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/tempurl" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/integration" @@ -165,7 +166,7 @@ func (t *openAPISuite) TestOpenAPIWillNotStartInDefaultConfig(c *check.C) { func (t *openAPISuite) TestSourceAPI(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) - s := setupServer(ctx, c) + s := setupTestServer(ctx, t.testT) defer func() { cancel() s.Close() @@ -274,7 +275,7 @@ func (t *openAPISuite) TestSourceAPI(c *check.C) { func (t *openAPISuite) TestRelayAPI(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) - s := setupServer(ctx, c) + s := setupTestServer(ctx, t.testT) ctrl := gomock.NewController(c) defer func() { cancel() @@ -434,7 +435,7 @@ func (t *openAPISuite) TestRelayAPI(c *check.C) { func (t *openAPISuite) TestTaskAPI(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) - s := setupServer(ctx, c) + s := setupTestServer(ctx, t.testT) c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB", `return(true)`), check.IsNil) checker.CheckSyncConfigFunc = mockCheckSyncConfig ctrl := gomock.NewController(c) @@ -536,7 +537,7 @@ func (t *openAPISuite) TestTaskAPI(c *check.C) { pauseTaskURL := fmt.Sprintf("%s/%s/pause", taskURL, task.Name) result = testutil.NewRequest().Post(pauseTaskURL).GoWithHTTPHandler(t.testT, s.openapiHandles) c.Assert(result.Code(), check.Equals, http.StatusOK) - c.Assert(s.scheduler.GetExpectSubTaskStage(task.Name, source1Name).Expect, check.Equals, pb.Stage_Paused) + c.Assert(s.scheduler.GetExpectSubTaskStage(task.Name, source1Name).Expect, check.Equals, pb.Stage_Stopped) resumeTaskURL := fmt.Sprintf("%s/%s/resume", taskURL, task.Name) result = testutil.NewRequest().Post(resumeTaskURL).GoWithHTTPHandler(t.testT, s.openapiHandles) @@ -618,7 +619,7 @@ func (t *openAPISuite) TestTaskAPI(c *check.C) { func (t *openAPISuite) TestClusterAPI(c *check.C) { ctx1, cancel1 := context.WithCancel(context.Background()) - s1 := setupServer(ctx1, c) + s1 := setupTestServer(ctx1, t.testT) defer func() { cancel1() s1.Close() @@ -714,7 +715,7 @@ func (t *openAPISuite) TestClusterAPI(c *check.C) { func (t *openAPISuite) TestTaskTemplatesAPI(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) - s := setupServer(ctx, c) + s := setupTestServer(ctx, t.testT) c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockSkipAdjustTargetDB", `return(true)`), check.IsNil) checker.CheckSyncConfigFunc = mockCheckSyncConfig defer func() { @@ -818,12 +819,13 @@ func (t *openAPISuite) TestTaskTemplatesAPI(c *check.C) { c.Assert(resultTaskList.Total, check.Equals, 0) } -func setupServer(ctx context.Context, c *check.C) *Server { +func setupTestServer(ctx context.Context, t *testing.T) *Server { + t.Helper() // create a new cluster cfg1 := NewConfig() - c.Assert(cfg1.Parse([]string{"-config=./dm-master.toml"}), check.IsNil) + require.Nil(t, cfg1.Parse([]string{"-config=./dm-master.toml"})) cfg1.Name = "dm-master-1" - cfg1.DataDir = c.MkDir() + cfg1.DataDir = t.TempDir() cfg1.MasterAddr = tempurl.Alloc()[len("http://"):] cfg1.PeerUrls = tempurl.Alloc() cfg1.AdvertisePeerUrls = cfg1.PeerUrls @@ -832,11 +834,11 @@ func setupServer(ctx context.Context, c *check.C) *Server { cfg1.OpenAPI = true s1 := NewServer(cfg1) - c.Assert(s1.Start(ctx), check.IsNil) + require.NoError(t, s1.Start(ctx)) // wait the first one become the leader - c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + require.True(t, utils.WaitSomething(30, 100*time.Millisecond, func() bool { return s1.election.IsLeader() && s1.scheduler.Started() - }), check.IsTrue) + })) return s1 } @@ -883,26 +885,6 @@ func mockTaskQueryStatus( Source: sourceName, }, SubTaskStatus: []*pb.SubTaskStatus{ - { - Stage: pb.Stage_Running, - Name: taskName, - Status: &pb.SubTaskStatus_Sync{ - Sync: &pb.SyncStatus{ - TotalEvents: 0, - TotalTps: 0, - RecentTps: 0, - MasterBinlog: "", - MasterBinlogGtid: "", - SyncerBinlog: "", - SyncerBinlogGtid: "", - BlockingDDLs: nil, - UnresolvedGroups: nil, - Synced: false, - BinlogType: "", - SecondsBehindMaster: 0, - }, - }, - }, { Stage: pb.Stage_Running, Name: taskName, diff --git a/dm/dm/master/scheduler/scheduler.go b/dm/dm/master/scheduler/scheduler.go index 0eded7196da..8f5de662757 100644 --- a/dm/dm/master/scheduler/scheduler.go +++ b/dm/dm/master/scheduler/scheduler.go @@ -738,14 +738,14 @@ func (s *Scheduler) TransferSource(ctx context.Context, source, worker string) e } } // pause running tasks - if batchPauseErr := s.batchOperateTaskOnWorker(ctx, oldWorker, runningTasks, source, pb.Stage_Paused, true); batchPauseErr != nil { + if batchPauseErr := s.BatchOperateTaskOnWorker(ctx, oldWorker, runningTasks, source, pb.Stage_Paused, true); batchPauseErr != nil { return batchPauseErr } // we need resume tasks that we just paused, we use another goroutine to do this because if error happens // just logging this message and let user handle it manually defer func() { go func() { - if err := s.batchOperateTaskOnWorker(context.Background(), w, runningTasks, source, pb.Stage_Running, false); err != nil { + if err := s.BatchOperateTaskOnWorker(context.Background(), w, runningTasks, source, pb.Stage_Running, false); err != nil { s.logger.Warn( "auto resume task failed", zap.Any("tasks", runningTasks), zap.String("source", source), zap.String("worker", worker), zap.Error(err)) @@ -779,8 +779,8 @@ func (s *Scheduler) TransferSource(ctx context.Context, source, worker string) e return nil } -// batchOperateTaskOnWorker batch operate tasks in one worker and use query-status to make sure all tasks are in expected stage if needWait=true. -func (s *Scheduler) batchOperateTaskOnWorker( +// BatchOperateTaskOnWorker batch operate tasks in one worker and use query-status to make sure all tasks are in expected stage if needWait=true. +func (s *Scheduler) BatchOperateTaskOnWorker( ctx context.Context, worker *Worker, tasks []string, source string, stage pb.Stage, needWait bool) error { for _, taskName := range tasks { if err := s.UpdateExpectSubTaskStage(stage, taskName, source); err != nil { @@ -847,7 +847,7 @@ func (s *Scheduler) AcquireSubtaskLatch(name string) (ReleaseFunc, error) { // AddSubTasks adds the information of one or more subtasks for one task. // use s.mu.RLock() to protect s.bound, and s.subtaskLatch to protect subtask related members. // setting `latched` to true means caller has acquired latch. -func (s *Scheduler) AddSubTasks(latched bool, cfgs ...config.SubTaskConfig) error { +func (s *Scheduler) AddSubTasks(latched bool, expectStage pb.Stage, cfgs ...config.SubTaskConfig) error { s.mu.RLock() defer s.mu.RUnlock() @@ -915,7 +915,7 @@ func (s *Scheduler) AddSubTasks(latched bool, cfgs ...config.SubTaskConfig) erro continue } newCfgs = append(newCfgs, cfg) - newStages = append(newStages, ha.NewSubTaskStage(pb.Stage_Running, cfg.SourceID, cfg.Name)) + newStages = append(newStages, ha.NewSubTaskStage(expectStage, cfg.SourceID, cfg.Name)) if cfg.ValidatorCfg.Mode != config.ValidationNone { validatorStages = append(validatorStages, ha.NewValidatorStage(pb.Stage_Running, cfg.SourceID, cfg.Name)) } @@ -1596,37 +1596,37 @@ func (s *Scheduler) GetExpectRelayStage(source string) ha.Stage { // UpdateExpectSubTaskStage updates the current expect subtask stage. // now, only support updates: -// - from `Running` to `Paused`. -// - from `Paused` to `Running`. +// - from `Running` to `Paused/Stopped`. +// - from `Paused/Stopped` to `Running`. // NOTE: from `Running` to `Running` and `Paused` to `Paused` still update the data in etcd, // because some user may want to update `{Running, Paused, ...}` to `{Running, Running, ...}`. // so, this should be also supported in DM-worker. -func (s *Scheduler) UpdateExpectSubTaskStage(newStage pb.Stage, task string, sources ...string) error { +func (s *Scheduler) UpdateExpectSubTaskStage(newStage pb.Stage, taskName string, sources ...string) error { if !s.started.Load() { return terror.ErrSchedulerNotStarted.Generate() } - if task == "" || len(sources) == 0 { + if taskName == "" || len(sources) == 0 { return nil // no subtask need to update, this should not happen. } // 1. check the new expectant stage. switch newStage { - case pb.Stage_Running, pb.Stage_Paused: + case pb.Stage_Running, pb.Stage_Paused, pb.Stage_Stopped: default: return terror.ErrSchedulerSubTaskStageInvalidUpdate.Generate(newStage) } - release, err := s.subtaskLatch.tryAcquire(task) + release, err := s.subtaskLatch.tryAcquire(taskName) if err != nil { - return terror.ErrSchedulerLatchInUse.Generate("UpdateExpectSubTaskStage", task) + return terror.ErrSchedulerLatchInUse.Generate("UpdateExpectSubTaskStage", taskName) } defer release() // 2. check the task exists. - v, ok := s.expectSubTaskStages.Load(task) + v, ok := s.expectSubTaskStages.Load(taskName) if !ok { - return terror.ErrSchedulerSubTaskOpTaskNotExist.Generate(task) + return terror.ErrSchedulerSubTaskOpTaskNotExist.Generate(taskName) } var ( @@ -1641,7 +1641,7 @@ func (s *Scheduler) UpdateExpectSubTaskStage(newStage pb.Stage, task string, sou } else { currStagesM[currStage.Expect.String()] = struct{}{} } - stages = append(stages, ha.NewSubTaskStage(newStage, source, task)) + stages = append(stages, ha.NewSubTaskStage(newStage, source, taskName)) } notExistSources := strMapToSlice(notExistSourcesM) currStages := strMapToSlice(currStagesM) diff --git a/dm/dm/master/scheduler/scheduler_test.go b/dm/dm/master/scheduler/scheduler_test.go index 63dc17592c0..b5f03c6d543 100644 --- a/dm/dm/master/scheduler/scheduler_test.go +++ b/dm/dm/master/scheduler/scheduler_test.go @@ -139,7 +139,7 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) { c.Assert(terror.ErrSchedulerNotStarted.Equal(s.AddSourceCfgWithWorker(sourceCfg1, workerName1)), IsTrue) c.Assert(terror.ErrSchedulerNotStarted.Equal(s.UpdateSourceCfg(sourceCfg1)), IsTrue) c.Assert(terror.ErrSchedulerNotStarted.Equal(s.RemoveSourceCfg(sourceID1)), IsTrue) - c.Assert(terror.ErrSchedulerNotStarted.Equal(s.AddSubTasks(false, subtaskCfg1)), IsTrue) + c.Assert(terror.ErrSchedulerNotStarted.Equal(s.AddSubTasks(false, pb.Stage_Running, subtaskCfg1)), IsTrue) c.Assert(terror.ErrSchedulerNotStarted.Equal(s.RemoveSubTasks(taskName1, sourceID1)), IsTrue) c.Assert(terror.ErrSchedulerNotStarted.Equal(s.AddWorker(workerName1, workerAddr1)), IsTrue) c.Assert(terror.ErrSchedulerNotStarted.Equal(s.RemoveWorker(workerName1)), IsTrue) @@ -253,13 +253,13 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) { return len(bounds) == 1 && bounds[0] == sourceID1 }), IsTrue) // no subtask config exists before start. - c.Assert(s.AddSubTasks(false), IsNil) // can call without configs, return without error, but take no effect. + c.Assert(s.AddSubTasks(false, pb.Stage_Running), IsNil) // can call without configs, return without error, but take no effect. t.subTaskCfgNotExist(c, s, taskName1, sourceID1) t.subTaskStageMatch(c, s, taskName1, sourceID1, pb.Stage_InvalidStage) t.downstreamMetaNotExist(c, s, taskName1) // start the task. - c.Assert(s.AddSubTasks(false, subtaskCfg1), IsNil) - c.Assert(terror.ErrSchedulerSubTaskExist.Equal(s.AddSubTasks(false, subtaskCfg1)), IsTrue) // add again. + c.Assert(s.AddSubTasks(false, pb.Stage_Running, subtaskCfg1), IsNil) + c.Assert(terror.ErrSchedulerSubTaskExist.Equal(s.AddSubTasks(false, pb.Stage_Running, subtaskCfg1)), IsTrue) // add again. // subtask config and stage exist. t.subTaskCfgExist(c, s, subtaskCfg1) t.subTaskStageMatch(c, s, taskName1, sourceID1, pb.Stage_Running) @@ -270,7 +270,7 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) { c.Assert(terror.ErrSchedulerSourceOpTaskExist.Equal(s.UpdateSourceCfg(sourceCfg1)), IsTrue) // try start a task with two sources, some sources not bound. - c.Assert(terror.ErrSchedulerSourcesUnbound.Equal(s.AddSubTasks(false, subtaskCfg21, subtaskCfg22)), IsTrue) + c.Assert(terror.ErrSchedulerSourcesUnbound.Equal(s.AddSubTasks(false, pb.Stage_Running, subtaskCfg21, subtaskCfg22)), IsTrue) t.subTaskCfgNotExist(c, s, taskName2, sourceID1) t.subTaskStageMatch(c, s, taskName2, sourceID1, pb.Stage_InvalidStage) t.subTaskCfgNotExist(c, s, taskName2, sourceID2) @@ -286,10 +286,9 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) { c.Assert(s.UpdateExpectSubTaskStage(pb.Stage_Paused, "", sourceID1), IsNil) c.Assert(s.UpdateExpectSubTaskStage(pb.Stage_Paused, taskName1), IsNil) t.relayStageMatch(c, s, sourceID1, pb.Stage_Running) - // update to non-(Running, Paused) stage is invalid. + // update to non-(New, Finished) stage is invalid. c.Assert(terror.ErrSchedulerSubTaskStageInvalidUpdate.Equal(s.UpdateExpectSubTaskStage(pb.Stage_InvalidStage, taskName1, sourceID1)), IsTrue) c.Assert(terror.ErrSchedulerSubTaskStageInvalidUpdate.Equal(s.UpdateExpectSubTaskStage(pb.Stage_New, taskName1, sourceID1)), IsTrue) - c.Assert(terror.ErrSchedulerSubTaskStageInvalidUpdate.Equal(s.UpdateExpectSubTaskStage(pb.Stage_Stopped, taskName1, sourceID1)), IsTrue) c.Assert(terror.ErrSchedulerSubTaskStageInvalidUpdate.Equal(s.UpdateExpectSubTaskStage(pb.Stage_Finished, taskName1, sourceID1)), IsTrue) // can't update stage with not existing sources now. c.Assert(terror.ErrSchedulerSubTaskOpSourceNotExist.Equal(s.UpdateExpectSubTaskStage(pb.Stage_Paused, taskName1, sourceID1, sourceID2)), IsTrue) @@ -414,14 +413,14 @@ func (t *testScheduler) testSchedulerProgress(c *C, restart int) { // CASE 4.4.1: start a task with two sources. // can't add more than one tasks at a time now. - c.Assert(terror.ErrSchedulerMultiTask.Equal(s.AddSubTasks(false, subtaskCfg1, subtaskCfg21)), IsTrue) + c.Assert(terror.ErrSchedulerMultiTask.Equal(s.AddSubTasks(false, pb.Stage_Running, subtaskCfg1, subtaskCfg21)), IsTrue) // task2' config and stage not exists before. t.subTaskCfgNotExist(c, s, taskName2, sourceID1) t.subTaskCfgNotExist(c, s, taskName2, sourceID2) t.subTaskStageMatch(c, s, taskName2, sourceID1, pb.Stage_InvalidStage) t.subTaskStageMatch(c, s, taskName2, sourceID2, pb.Stage_InvalidStage) // start task2. - c.Assert(s.AddSubTasks(false, subtaskCfg21, subtaskCfg22), IsNil) + c.Assert(s.AddSubTasks(false, pb.Stage_Running, subtaskCfg21, subtaskCfg22), IsNil) // config added, stage become Running. t.subTaskCfgExist(c, s, subtaskCfg21) t.subTaskCfgExist(c, s, subtaskCfg22) @@ -2017,7 +2016,7 @@ func (t *testScheduler) TestOperateValidatorTask(c *C) { c.Assert(err, IsNil) c.Assert(s.Start(ctx, etcdTestCli), IsNil) // CASE 1: start subtask without starting validation - c.Assert(s.AddSubTasks(false, subtaskCfg), IsNil) // create new subtask without validation + c.Assert(s.AddSubTasks(false, pb.Stage_Running, subtaskCfg), IsNil) // create new subtask without validation t.subTaskCfgExist(c, s, subtaskCfg) subtaskCfg.ValidatorCfg.Mode = config.ValidationFull // set mode stCfgs := make(map[string]map[string]config.SubTaskConfig) diff --git a/dm/dm/master/server.go b/dm/dm/master/server.go index 278270e0601..6c5014166ee 100644 --- a/dm/dm/master/server.go +++ b/dm/dm/master/server.go @@ -137,7 +137,7 @@ func NewServer(cfg *Config) *Server { scheduler: scheduler.NewScheduler(&logger, cfg.Security), ap: NewAgentPool(&RateLimitConfig{rate: cfg.RPCRateLimit, burst: cfg.RPCRateBurst}), } - server.pessimist = shardddl.NewPessimist(&logger, server.getTaskResources) + server.pessimist = shardddl.NewPessimist(&logger, server.getTaskSourceNameList) server.optimist = shardddl.NewOptimist(&logger, server.scheduler.GetDownstreamMetaByTask) server.closed.Store(true) setUseTLS(&cfg.Security) @@ -563,7 +563,7 @@ func (s *Server) StartTask(ctx context.Context, req *pb.StartTaskRequest) (*pb.S return respWithErr(terror.Annotate(err, "while putting task command line arguments")) } } - err = s.scheduler.AddSubTasks(latched, subtaskCfgPointersToInstances(stCfgs...)...) + err = s.scheduler.AddSubTasks(latched, pb.Stage_Running, subtaskCfgPointersToInstances(stCfgs...)...) if err != nil { return respWithErr(err) } @@ -603,7 +603,7 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (* sources := req.Sources if len(req.Sources) == 0 { - sources = s.getTaskResources(req.Name) + sources = s.getTaskSourceNameList(req.Name) } if len(sources) == 0 { resp.Msg = fmt.Sprintf("task %s has no source or not exist, please check the task name and status", req.Name) @@ -615,14 +615,14 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (* expect = pb.Stage_Paused case pb.TaskOp_Resume: expect = pb.Stage_Running - case pb.TaskOp_Stop: - expect = pb.Stage_Stopped + case pb.TaskOp_Delete: + // op_delete means delete this running subtask, we not have the expected stage now default: resp.Msg = terror.ErrMasterInvalidOperateOp.Generate(req.Op.String(), "task").Error() return resp, nil } var err error - if expect == pb.Stage_Stopped { + if req.Op == pb.TaskOp_Delete { err = s.scheduler.RemoveSubTasks(req.Name, sources...) } else { err = s.scheduler.UpdateExpectSubTaskStage(expect, req.Name, sources...) @@ -636,7 +636,7 @@ func (s *Server) OperateTask(ctx context.Context, req *pb.OperateTaskRequest) (* resp.Result = true resp.Sources = s.getSourceRespsAfterOperation(ctx, req.Name, sources, []string{}, req) - if expect == pb.Stage_Stopped { + if req.Op == pb.TaskOp_Delete { // delete meta data for optimist if len(req.Sources) == 0 { err2 = s.optimist.RemoveMetaDataWithTask(req.Name) @@ -779,7 +779,7 @@ func extractSources(s *Server, req hasWokers) (sources []string, specifiedSource specifiedSource = true case len(req.GetName()) > 0: // query specified task's sources - sources = s.getTaskResources(req.GetName()) + sources = s.getTaskSourceNameList(req.GetName()) if len(sources) == 0 { return nil, false, errors.Errorf("task %s has no source or not exist", req.GetName()) } @@ -1065,11 +1065,11 @@ func (s *Server) OperateWorkerRelayTask(ctx context.Context, req *pb.OperateWork return resp, nil } -// getTaskResources gets workers relevant to specified task. -func (s *Server) getTaskResources(task string) []string { +// getTaskSourceNameList gets workers relevant to specified task. +func (s *Server) getTaskSourceNameList(taskName string) []string { s.Lock() defer s.Unlock() - cfgM := s.scheduler.GetSubTaskCfgsByTask(task) + cfgM := s.scheduler.GetSubTaskCfgsByTask(taskName) // do a copy ret := make([]string, 0, len(cfgM)) for source := range cfgM { @@ -1720,8 +1720,7 @@ func (s *Server) waitOperationOk( expect = pb.Stage_Running case pb.TaskOp_Pause: expect = pb.Stage_Paused - case pb.TaskOp_Stop: - expect = pb.Stage_Stopped + case pb.TaskOp_Delete: } case *pb.OperateWorkerRelayRequest: switch req.Op { @@ -1805,7 +1804,7 @@ func (s *Server) waitOperationOk( } } case *pb.StartTaskRequest, *pb.UpdateTaskRequest, *pb.OperateTaskRequest: - if expect == pb.Stage_Stopped && len(queryResp.SubTaskStatus) == 0 { + if opTaskReq, ok := masterReq.(*pb.OperateTaskRequest); ok && opTaskReq.Op == pb.TaskOp_Delete && len(queryResp.SubTaskStatus) == 0 { return true, "", queryResp, nil } if len(queryResp.SubTaskStatus) == 1 { @@ -1820,8 +1819,8 @@ func (s *Server) waitOperationOk( if expect == pb.Stage_Running { finished = pb.Stage_Finished } - if expect == pb.Stage_Stopped { - if st, ok2 := subtaskStatus.Status.(*pb.SubTaskStatus_Msg); ok2 && st.Msg == dmcommon.NoSubTaskMsg(taskName) { + if opTaskReq, ok2 := masterReq.(*pb.OperateTaskRequest); ok2 && opTaskReq.Op == pb.TaskOp_Delete { + if st, ok3 := subtaskStatus.Status.(*pb.SubTaskStatus_Msg); ok3 && st.Msg == dmcommon.NoSubTaskMsg(taskName) { ok = true } else { // make sure there is no subtask @@ -2411,7 +2410,7 @@ func (s *Server) HandleError(ctx context.Context, req *pb.HandleErrorRequest) (* sources := req.Sources if len(sources) == 0 { - sources = s.getTaskResources(req.Task) + sources = s.getTaskSourceNameList(req.Task) log.L().Info(fmt.Sprintf("sources: %s", sources)) if len(sources) == 0 { return &pb.HandleErrorResponse{ diff --git a/dm/dm/master/server_test.go b/dm/dm/master/server_test.go index 19d7be46ebd..b3c23e14403 100644 --- a/dm/dm/master/server_test.go +++ b/dm/dm/master/server_test.go @@ -244,8 +244,7 @@ func mockRevelantWorkerClient(mockWorkerClient *pbmock.MockWorkerClient, taskNam expect = pb.Stage_Running case pb.TaskOp_Pause: expect = pb.Stage_Paused - case pb.TaskOp_Stop: - expect = pb.Stage_Stopped + case pb.TaskOp_Delete: } case *pb.OperateWorkerRelayRequest: switch req.Op { @@ -272,7 +271,7 @@ func mockRevelantWorkerClient(mockWorkerClient *pbmock.MockWorkerClient, taskNam } case *pb.StartTaskRequest, *pb.UpdateTaskRequest, *pb.OperateTaskRequest: queryResp.SubTaskStatus = []*pb.SubTaskStatus{{}} - if expect == pb.Stage_Stopped { + if opTaskReq, ok := masterReq.(*pb.OperateTaskRequest); ok && opTaskReq.Op == pb.TaskOp_Delete { queryResp.SubTaskStatus[0].Status = &pb.SubTaskStatus_Msg{ Msg: fmt.Sprintf("no sub task with name %s has started", taskName), } @@ -607,7 +606,7 @@ func (t *testMaster) TestStopTaskWithExceptRight(c *check.C) { }}, }} req := &pb.OperateTaskRequest{ - Op: pb.TaskOp_Stop, + Op: pb.TaskOp_Delete, Name: taskName, } ctrl := gomock.NewController(c) @@ -1168,12 +1167,12 @@ func (t *testMaster) TestOperateTask(c *check.C) { Name: taskName, } stopReq1 := &pb.OperateTaskRequest{ - Op: pb.TaskOp_Stop, + Op: pb.TaskOp_Delete, Name: taskName, Sources: []string{sources[0]}, } stopReq2 := &pb.OperateTaskRequest{ - Op: pb.TaskOp_Stop, + Op: pb.TaskOp_Delete, Name: taskName, } sourceResps := []*pb.CommonWorkerResponse{{Result: true, Source: sources[0]}, {Result: true, Source: sources[1]}} @@ -1212,13 +1211,13 @@ func (t *testMaster) TestOperateTask(c *check.C) { resp, err = server.OperateTask(context.Background(), stopReq1) c.Assert(err, check.IsNil) c.Assert(resp.Result, check.IsTrue) - c.Assert(server.getTaskResources(taskName), check.DeepEquals, []string{sources[1]}) + c.Assert(server.getTaskSourceNameList(taskName), check.DeepEquals, []string{sources[1]}) c.Assert(resp.Sources, check.DeepEquals, []*pb.CommonWorkerResponse{{Result: true, Source: sources[0]}}) // 5. test stop task successfully, remove all workers resp, err = server.OperateTask(context.Background(), stopReq2) c.Assert(err, check.IsNil) c.Assert(resp.Result, check.IsTrue) - c.Assert(len(server.getTaskResources(taskName)), check.Equals, 0) + c.Assert(len(server.getTaskSourceNameList(taskName)), check.Equals, 0) c.Assert(resp.Sources, check.DeepEquals, []*pb.CommonWorkerResponse{{Result: true, Source: sources[1]}}) t.clearSchedulerEnv(c, cancel, &wg) } diff --git a/dm/dm/pb/dmworker.pb.go b/dm/dm/pb/dmworker.pb.go index f52701d0f71..567d060d204 100644 --- a/dm/dm/pb/dmworker.pb.go +++ b/dm/dm/pb/dmworker.pb.go @@ -37,6 +37,7 @@ const ( TaskOp_Start TaskOp = 4 TaskOp_Update TaskOp = 5 TaskOp_AutoResume TaskOp = 6 + TaskOp_Delete TaskOp = 7 ) var TaskOp_name = map[int32]string{ @@ -47,6 +48,7 @@ var TaskOp_name = map[int32]string{ 4: "Start", 5: "Update", 6: "AutoResume", + 7: "Delete", } var TaskOp_value = map[string]int32{ @@ -57,6 +59,7 @@ var TaskOp_value = map[string]int32{ "Start": 4, "Update": 5, "AutoResume": 6, + "Delete": 7, } func (x TaskOp) String() string { @@ -2633,142 +2636,143 @@ func init() { func init() { proto.RegisterFile("dmworker.proto", fileDescriptor_51a1b9e17fd67b10) } var fileDescriptor_51a1b9e17fd67b10 = []byte{ - // 2154 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x58, 0x4f, 0x6f, 0x1c, 0x4b, - 0x11, 0xdf, 0xd9, 0x7f, 0xde, 0xad, 0x5d, 0x3b, 0x93, 0x4e, 0x5e, 0x58, 0x4c, 0x58, 0xac, 0xc9, - 0x53, 0x30, 0x16, 0xb2, 0x5e, 0xcc, 0x43, 0x0f, 0x3d, 0x09, 0x78, 0xc4, 0xce, 0x73, 0x02, 0x0e, - 0x4e, 0xc6, 0x4e, 0x38, 0xa2, 0xf6, 0x4c, 0x7b, 0x3d, 0x78, 0x76, 0x66, 0x32, 0xdd, 0x63, 0xcb, - 0x07, 0xc4, 0x47, 0x80, 0x0b, 0x07, 0x10, 0x57, 0xae, 0xef, 0xc8, 0x47, 0x00, 0x8e, 0x11, 0x12, - 0x12, 0x47, 0x94, 0x7c, 0x0d, 0x0e, 0xa8, 0xaa, 0x7b, 0x66, 0x7a, 0xec, 0x75, 0x42, 0x0e, 0xdc, - 0xa6, 0x7e, 0x55, 0x5d, 0x55, 0x5d, 0x5d, 0x7f, 0xba, 0x07, 0x56, 0xc2, 0xf9, 0x79, 0x9a, 0x9f, - 0x8a, 0x7c, 0x33, 0xcb, 0x53, 0x95, 0xb2, 0x76, 0x76, 0xe4, 0xad, 0x03, 0x7b, 0x5e, 0x88, 0xfc, - 0xe2, 0x40, 0x71, 0x55, 0x48, 0x5f, 0xbc, 0x2a, 0x84, 0x54, 0x8c, 0x41, 0x37, 0xe1, 0x73, 0x31, - 0x71, 0xd6, 0x9c, 0xf5, 0xa1, 0x4f, 0xdf, 0x5e, 0x06, 0xb7, 0xb7, 0xd3, 0xf9, 0x3c, 0x4d, 0x7e, - 0x41, 0x3a, 0x7c, 0x21, 0xb3, 0x34, 0x91, 0x82, 0xdd, 0x81, 0x7e, 0x2e, 0x64, 0x11, 0x2b, 0x92, - 0x1e, 0xf8, 0x86, 0x62, 0x2e, 0x74, 0xe6, 0x72, 0x36, 0x69, 0x93, 0x0a, 0xfc, 0x44, 0x49, 0x99, - 0x16, 0x79, 0x20, 0x26, 0x1d, 0x02, 0x0d, 0x85, 0xb8, 0xf6, 0x6b, 0xd2, 0xd5, 0xb8, 0xa6, 0xbc, - 0xaf, 0x1c, 0xb8, 0xd5, 0x70, 0xee, 0x83, 0x2d, 0x7e, 0x0a, 0x63, 0x6d, 0x43, 0x6b, 0x20, 0xbb, - 0xa3, 0x2d, 0x77, 0x33, 0x3b, 0xda, 0x3c, 0xb0, 0x70, 0xbf, 0x21, 0xc5, 0x3e, 0x83, 0x65, 0x59, - 0x1c, 0x1d, 0x72, 0x79, 0x6a, 0x96, 0x75, 0xd7, 0x3a, 0xeb, 0xa3, 0xad, 0x9b, 0xb4, 0xcc, 0x66, - 0xf8, 0x4d, 0x39, 0xef, 0xcf, 0x0e, 0x8c, 0xb6, 0x4f, 0x44, 0x60, 0x68, 0x74, 0x34, 0xe3, 0x52, - 0x8a, 0xb0, 0x74, 0x54, 0x53, 0xec, 0x36, 0xf4, 0x54, 0xaa, 0x78, 0x4c, 0xae, 0xf6, 0x7c, 0x4d, - 0xb0, 0x29, 0x80, 0x2c, 0x82, 0x40, 0x48, 0x79, 0x5c, 0xc4, 0xe4, 0x6a, 0xcf, 0xb7, 0x10, 0xd4, - 0x76, 0xcc, 0xa3, 0x58, 0x84, 0x14, 0xa6, 0x9e, 0x6f, 0x28, 0x36, 0x81, 0xa5, 0x73, 0x9e, 0x27, - 0x51, 0x32, 0x9b, 0xf4, 0x88, 0x51, 0x92, 0xb8, 0x22, 0x14, 0x8a, 0x47, 0xf1, 0xa4, 0xbf, 0xe6, - 0xac, 0x8f, 0x7d, 0x43, 0x79, 0xaf, 0x1d, 0x80, 0x9d, 0x62, 0x9e, 0x19, 0x37, 0xd7, 0x60, 0x44, - 0x1e, 0x1c, 0xf2, 0xa3, 0x58, 0x48, 0xf2, 0xb5, 0xe3, 0xdb, 0x10, 0x5b, 0x87, 0x1b, 0x41, 0x3a, - 0xcf, 0x62, 0xa1, 0x44, 0x68, 0xa4, 0xd0, 0x75, 0xc7, 0xbf, 0x0c, 0xb3, 0x8f, 0x61, 0xf9, 0x38, - 0x4a, 0x22, 0x79, 0x22, 0xc2, 0x87, 0x17, 0x4a, 0xe8, 0x90, 0x3b, 0x7e, 0x13, 0x64, 0x1e, 0x8c, - 0x4b, 0xc0, 0x4f, 0xcf, 0x25, 0x6d, 0xc8, 0xf1, 0x1b, 0x18, 0xfb, 0x2e, 0xdc, 0x14, 0x52, 0x45, - 0x73, 0xae, 0xc4, 0x21, 0xba, 0x42, 0x82, 0x3d, 0x12, 0xbc, 0xca, 0xf0, 0xfe, 0xe2, 0x00, 0xec, - 0xa5, 0x3c, 0x34, 0x5b, 0xba, 0xe2, 0x86, 0xde, 0xd4, 0x25, 0x37, 0xa6, 0x00, 0xb4, 0x4b, 0x2d, - 0xd2, 0x26, 0x11, 0x0b, 0x61, 0xab, 0x30, 0xc8, 0xf2, 0x74, 0x96, 0x0b, 0x29, 0x4d, 0xca, 0x56, - 0x34, 0xae, 0x9d, 0x0b, 0xc5, 0x1f, 0x46, 0x49, 0x9c, 0xce, 0x4c, 0xe2, 0x5a, 0x08, 0xbb, 0x0f, - 0x2b, 0x35, 0xb5, 0x7b, 0xf8, 0x64, 0x87, 0x7c, 0x1f, 0xfa, 0x97, 0x50, 0xef, 0xf7, 0x0e, 0x2c, - 0x1f, 0x9c, 0xf0, 0x3c, 0x8c, 0x92, 0xd9, 0x6e, 0x9e, 0x16, 0x19, 0x9e, 0x9a, 0xe2, 0xf9, 0x4c, - 0x28, 0x53, 0x7e, 0x86, 0xc2, 0xa2, 0xdc, 0xd9, 0xd9, 0x43, 0x3f, 0x3b, 0x58, 0x94, 0xf8, 0xad, - 0xf7, 0x99, 0x4b, 0xb5, 0x97, 0x06, 0x5c, 0x45, 0x69, 0x62, 0xdc, 0x6c, 0x82, 0x54, 0x78, 0x17, - 0x49, 0x40, 0x99, 0xd3, 0xa1, 0xc2, 0x23, 0x0a, 0xf7, 0x57, 0x24, 0x86, 0xd3, 0x23, 0x4e, 0x45, - 0x7b, 0xff, 0xec, 0x00, 0x1c, 0x5c, 0x24, 0xc1, 0xa5, 0x1c, 0x79, 0x74, 0x26, 0x12, 0xd5, 0xcc, - 0x11, 0x0d, 0xa1, 0x32, 0x9d, 0x32, 0x59, 0x19, 0xca, 0x8a, 0x66, 0x77, 0x61, 0x98, 0x8b, 0x40, - 0x24, 0x0a, 0x99, 0x1d, 0x62, 0xd6, 0x00, 0x66, 0xc3, 0x9c, 0x4b, 0x25, 0xf2, 0x46, 0x30, 0x1b, - 0x18, 0xdb, 0x00, 0xd7, 0xa6, 0x77, 0x55, 0x14, 0x9a, 0x80, 0x5e, 0xc1, 0x51, 0x1f, 0x6d, 0xa2, - 0xd4, 0xd7, 0xd7, 0xfa, 0x6c, 0x0c, 0xf5, 0xd9, 0x34, 0xe9, 0x5b, 0xd2, 0xfa, 0x2e, 0xe3, 0xa8, - 0xef, 0x28, 0x4e, 0x83, 0xd3, 0x28, 0x99, 0xd1, 0x01, 0x0c, 0x28, 0x54, 0x0d, 0x8c, 0xfd, 0x10, - 0xdc, 0x22, 0xc9, 0x85, 0x4c, 0xe3, 0x33, 0x11, 0xd2, 0x39, 0xca, 0xc9, 0xd0, 0x6a, 0x1b, 0xf6, - 0x09, 0xfb, 0x57, 0x44, 0xad, 0x13, 0x02, 0xdd, 0x29, 0xcc, 0x09, 0x4d, 0x01, 0x8e, 0xc8, 0x91, - 0xc3, 0x8b, 0x4c, 0x4c, 0x46, 0x3a, 0xcb, 0x6a, 0x84, 0x7d, 0x02, 0xb7, 0xa4, 0x08, 0xd2, 0x24, - 0x94, 0x0f, 0xc5, 0x49, 0x94, 0x84, 0x4f, 0x29, 0x16, 0x93, 0x31, 0x85, 0x78, 0x11, 0xcb, 0xfb, - 0x93, 0x03, 0x63, 0xbb, 0xf7, 0x59, 0x5d, 0xd9, 0xb9, 0xa6, 0x2b, 0xb7, 0xed, 0xae, 0xcc, 0xbe, - 0x53, 0x75, 0x5f, 0xdd, 0x4d, 0x69, 0x7f, 0xcf, 0xf2, 0x14, 0xdb, 0x94, 0x4f, 0x8c, 0xaa, 0x21, - 0x3f, 0x80, 0x51, 0x2e, 0x62, 0x7e, 0x51, 0xb5, 0x51, 0x94, 0xbf, 0x81, 0xf2, 0x7e, 0x0d, 0xfb, - 0xb6, 0x8c, 0xf7, 0xb7, 0x36, 0x8c, 0x2c, 0xe6, 0x95, 0xdc, 0x70, 0xfe, 0xc7, 0xdc, 0x68, 0x5f, - 0x93, 0x1b, 0x6b, 0xa5, 0x4b, 0xc5, 0xd1, 0x4e, 0x94, 0x9b, 0x72, 0xb1, 0xa1, 0x4a, 0xa2, 0x91, - 0x8c, 0x36, 0x84, 0xdd, 0xd0, 0x22, 0xad, 0x54, 0xbc, 0x0c, 0xb3, 0x4d, 0x60, 0x04, 0x6d, 0x73, - 0x15, 0x9c, 0xbc, 0xc8, 0xcc, 0xe9, 0xf4, 0xe9, 0x88, 0x17, 0x70, 0xd8, 0xb7, 0xa0, 0x27, 0x15, - 0x9f, 0x09, 0x4a, 0xc5, 0x95, 0xad, 0x21, 0xa5, 0x0e, 0x02, 0xbe, 0xc6, 0xad, 0xe0, 0x0f, 0xde, - 0x13, 0x7c, 0xef, 0x3f, 0x6d, 0x58, 0x6e, 0x4c, 0xab, 0x45, 0x53, 0xbd, 0xb6, 0xd8, 0xbe, 0xc6, - 0xe2, 0x1a, 0x74, 0x8b, 0x24, 0xd2, 0x87, 0xbd, 0xb2, 0x35, 0x46, 0xfe, 0x8b, 0x24, 0x52, 0x98, - 0x7d, 0x3e, 0x71, 0x2c, 0x9f, 0xba, 0xef, 0x4b, 0x88, 0x4f, 0xe0, 0x56, 0x9d, 0xfa, 0x3b, 0x3b, - 0x7b, 0x7b, 0x69, 0x70, 0x5a, 0x75, 0xc6, 0x45, 0x2c, 0xc6, 0xf4, 0x4c, 0xa7, 0x12, 0x7e, 0xdc, - 0xd2, 0x53, 0xfd, 0xdb, 0xd0, 0x0b, 0x70, 0xca, 0x52, 0x94, 0x4c, 0x42, 0x59, 0x63, 0xf7, 0x71, - 0xcb, 0xd7, 0x7c, 0xf6, 0x31, 0x74, 0xc3, 0x62, 0x9e, 0x99, 0x58, 0xad, 0xa0, 0x5c, 0x3d, 0xf6, - 0x1e, 0xb7, 0x7c, 0xe2, 0xa2, 0x54, 0x9c, 0xf2, 0x70, 0x32, 0xac, 0xa5, 0xea, 0x49, 0x82, 0x52, - 0xc8, 0x45, 0x29, 0xac, 0x49, 0xaa, 0x4f, 0x23, 0x55, 0xb7, 0x47, 0x94, 0x42, 0xee, 0xc3, 0x01, - 0xf4, 0xa5, 0x4e, 0xe4, 0x1f, 0xc1, 0xcd, 0x46, 0xf4, 0xf7, 0x22, 0x49, 0xa1, 0xd2, 0xec, 0x89, - 0x73, 0xdd, 0x95, 0xa2, 0x5c, 0x3f, 0x05, 0xa0, 0x3d, 0x3d, 0xca, 0xf3, 0x34, 0x2f, 0xaf, 0x36, - 0x4e, 0x75, 0xb5, 0xf1, 0xbe, 0x09, 0x43, 0xdc, 0xcb, 0x3b, 0xd8, 0xb8, 0x89, 0xeb, 0xd8, 0x19, - 0x8c, 0xc9, 0xfb, 0xe7, 0x7b, 0xd7, 0x48, 0xb0, 0x2d, 0xb8, 0xad, 0xef, 0x17, 0x3a, 0x9d, 0x9f, - 0xa5, 0x32, 0xa2, 0x01, 0xa3, 0x0b, 0x6b, 0x21, 0x0f, 0x47, 0x80, 0x40, 0x75, 0x07, 0xcf, 0xf7, - 0xca, 0x79, 0x59, 0xd2, 0xde, 0xf7, 0x61, 0x88, 0x16, 0xb5, 0xb9, 0x75, 0xe8, 0x13, 0xa3, 0x8c, - 0x83, 0x5b, 0x85, 0xd3, 0x38, 0xe4, 0x1b, 0xbe, 0xf7, 0x5b, 0x07, 0x46, 0xba, 0x5d, 0xe9, 0x95, - 0x1f, 0xda, 0xad, 0xd6, 0x1a, 0xcb, 0xcb, 0x7a, 0xb7, 0x35, 0x6e, 0x02, 0x50, 0xc3, 0xd1, 0x02, - 0xdd, 0xfa, 0x78, 0x6b, 0xd4, 0xb7, 0x24, 0xf0, 0x60, 0x6a, 0x6a, 0x41, 0x68, 0xff, 0xd0, 0x86, - 0xb1, 0x39, 0x52, 0x2d, 0xf2, 0x7f, 0x2a, 0x3b, 0x53, 0x19, 0x5d, 0xbb, 0x32, 0xee, 0x97, 0x95, - 0xd1, 0xab, 0xb7, 0x51, 0x67, 0x51, 0x5d, 0x18, 0xf7, 0x4c, 0x61, 0xf4, 0x49, 0x6c, 0xb9, 0x2c, - 0x8c, 0x52, 0x4a, 0xd7, 0xc5, 0x3d, 0x53, 0x17, 0x4b, 0xb5, 0x50, 0x95, 0x52, 0x55, 0x59, 0xdc, - 0x33, 0x65, 0x31, 0xa8, 0x85, 0xaa, 0x63, 0xae, 0xaa, 0x62, 0x09, 0x7a, 0x74, 0x9c, 0xde, 0xe7, - 0xe0, 0xda, 0xa1, 0xa1, 0x9a, 0xb8, 0x6f, 0x98, 0x8d, 0x54, 0xb0, 0x84, 0x7c, 0xb3, 0xf6, 0x15, - 0x2c, 0x37, 0x9a, 0x0a, 0xce, 0xc6, 0x48, 0x6e, 0xf3, 0x24, 0x10, 0x71, 0x75, 0xc3, 0xb6, 0x10, - 0x2b, 0xc9, 0xda, 0xb5, 0x66, 0xa3, 0xa2, 0x91, 0x64, 0xd6, 0x3d, 0xb9, 0xd3, 0xb8, 0x27, 0xff, - 0xc3, 0x81, 0xb1, 0xbd, 0x00, 0xaf, 0xda, 0x8f, 0xf2, 0x7c, 0x3b, 0x0d, 0xf5, 0x69, 0xf6, 0xfc, - 0x92, 0xc4, 0xd4, 0xc7, 0xcf, 0x98, 0x4b, 0x69, 0x32, 0xb0, 0xa2, 0x0d, 0xef, 0x20, 0x48, 0xb3, - 0xf2, 0xe5, 0x53, 0xd1, 0x86, 0xb7, 0x27, 0xce, 0x44, 0x6c, 0x46, 0x4d, 0x45, 0xa3, 0xb5, 0xa7, - 0x42, 0x4a, 0x4c, 0x13, 0xdd, 0x21, 0x4b, 0x12, 0x57, 0xf9, 0xfc, 0x7c, 0x9b, 0x17, 0x52, 0x98, - 0xdb, 0x4d, 0x45, 0x63, 0x58, 0xf0, 0x85, 0xc6, 0xf3, 0xb4, 0x48, 0xca, 0x3b, 0x8d, 0x85, 0x78, - 0xe7, 0x70, 0xf3, 0x59, 0x91, 0xcf, 0x04, 0x25, 0x71, 0xf9, 0xe0, 0x5b, 0x85, 0x41, 0x94, 0xf0, - 0x40, 0x45, 0x67, 0xc2, 0x44, 0xb2, 0xa2, 0x31, 0x7f, 0x55, 0x34, 0x17, 0xe6, 0x52, 0x47, 0xdf, - 0x28, 0x7f, 0x1c, 0xc5, 0x82, 0xf2, 0xda, 0x6c, 0xa9, 0xa4, 0xa9, 0x44, 0xf5, 0x74, 0x35, 0xcf, - 0x39, 0x4d, 0x79, 0x7f, 0x6c, 0xc3, 0xea, 0x7e, 0x26, 0x72, 0xae, 0x84, 0x7e, 0x42, 0x1e, 0x04, - 0x27, 0x62, 0xce, 0x4b, 0x17, 0xee, 0x42, 0x3b, 0xcd, 0xc8, 0xb8, 0xc9, 0x77, 0xcd, 0xde, 0xcf, - 0xfc, 0x76, 0x9a, 0x91, 0x13, 0x5c, 0x9e, 0x9a, 0xd8, 0xd2, 0xf7, 0xb5, 0xef, 0xc9, 0x55, 0x18, - 0x84, 0x5c, 0xf1, 0x23, 0x2e, 0x45, 0x19, 0xd3, 0x92, 0xa6, 0xa7, 0x17, 0xbe, 0x54, 0x4c, 0x44, - 0x35, 0x41, 0x9a, 0xc8, 0x9a, 0x89, 0xa6, 0xa1, 0x50, 0xfa, 0x38, 0x2e, 0xe4, 0x09, 0x85, 0x71, - 0xe0, 0x6b, 0x02, 0x7d, 0xa9, 0x72, 0x7e, 0xa0, 0x53, 0x1c, 0xa3, 0x7e, 0x9c, 0xa7, 0x73, 0xdd, - 0x58, 0x68, 0x94, 0x0c, 0x7c, 0x0b, 0x29, 0xf9, 0x87, 0xfa, 0x62, 0x0f, 0x35, 0x5f, 0x23, 0x9e, - 0x82, 0xe5, 0x97, 0x0f, 0x4c, 0xda, 0x3f, 0x15, 0x8a, 0xb3, 0x55, 0x2b, 0x1c, 0x80, 0xe1, 0x40, - 0x8e, 0x09, 0xc6, 0x7b, 0xbb, 0x47, 0xd9, 0x72, 0x3a, 0x56, 0xcb, 0x29, 0x23, 0xd8, 0xa5, 0x14, - 0xa7, 0x6f, 0xef, 0x53, 0xb8, 0x6d, 0x4e, 0xe4, 0xe5, 0x03, 0xb4, 0x7a, 0xed, 0x59, 0x68, 0xb6, - 0x36, 0xef, 0xfd, 0xd5, 0x81, 0x8f, 0x2e, 0x2d, 0xfb, 0xe0, 0x97, 0xf9, 0x67, 0xd0, 0xc5, 0x87, - 0xd0, 0xa4, 0x43, 0xa5, 0x79, 0x0f, 0x6d, 0x2c, 0x54, 0xb9, 0x89, 0xc4, 0xa3, 0x44, 0xe5, 0x17, - 0x3e, 0x2d, 0x58, 0xfd, 0x29, 0x0c, 0x2b, 0x08, 0xf5, 0x9e, 0x8a, 0x8b, 0xb2, 0xfb, 0x9e, 0x8a, - 0x0b, 0xbc, 0x1b, 0x9c, 0xf1, 0xb8, 0xd0, 0xa1, 0x31, 0x03, 0xb6, 0x11, 0x58, 0x5f, 0xf3, 0x3f, - 0x6f, 0xff, 0xc0, 0xf1, 0x7e, 0x0d, 0x93, 0xc7, 0x3c, 0x09, 0x63, 0x93, 0x8f, 0xba, 0x29, 0x98, - 0x10, 0x7c, 0xc3, 0x0a, 0xc1, 0x08, 0xb5, 0x10, 0xf7, 0x1d, 0xd9, 0x78, 0x17, 0x86, 0x47, 0xe5, - 0x38, 0x34, 0x81, 0xaf, 0x01, 0xca, 0x99, 0x57, 0xb1, 0x34, 0x0f, 0x30, 0xfa, 0xf6, 0x3e, 0x82, - 0x5b, 0xbb, 0x42, 0x69, 0xdb, 0xdb, 0xc7, 0x33, 0x63, 0xd9, 0x5b, 0x87, 0xdb, 0x4d, 0xd8, 0x04, - 0xd7, 0x85, 0x4e, 0x70, 0x5c, 0x8d, 0x9a, 0xe0, 0x78, 0xb6, 0xf1, 0x4b, 0xe8, 0xeb, 0xac, 0x60, - 0xcb, 0x30, 0x7c, 0x92, 0x9c, 0xf1, 0x38, 0x0a, 0xf7, 0x33, 0xb7, 0xc5, 0x06, 0xd0, 0x3d, 0x50, - 0x69, 0xe6, 0x3a, 0x6c, 0x08, 0xbd, 0x67, 0xd8, 0x16, 0xdc, 0x36, 0x03, 0xe8, 0x63, 0xe7, 0x9c, - 0x0b, 0xb7, 0x83, 0xf0, 0x81, 0xe2, 0xb9, 0x72, 0xbb, 0x08, 0xbf, 0xc8, 0x42, 0xae, 0x84, 0xdb, - 0x63, 0x2b, 0x00, 0x3f, 0x29, 0x54, 0x6a, 0xc4, 0xfa, 0x1b, 0xbf, 0x21, 0xb1, 0x19, 0xda, 0x1e, - 0x1b, 0xfd, 0x44, 0xbb, 0x2d, 0xb6, 0x04, 0x9d, 0x9f, 0x8b, 0x73, 0xd7, 0x61, 0x23, 0x58, 0xf2, - 0x8b, 0x24, 0x89, 0x92, 0x99, 0xb6, 0x41, 0xe6, 0x42, 0xb7, 0x83, 0x0c, 0x74, 0x22, 0x13, 0xa1, - 0xdb, 0x65, 0x63, 0x18, 0x7c, 0x69, 0xde, 0xde, 0x6e, 0x0f, 0x59, 0x28, 0x86, 0x6b, 0xfa, 0xc8, - 0x22, 0x83, 0x48, 0x2d, 0x21, 0x45, 0xab, 0x90, 0x1a, 0x6c, 0xec, 0xc3, 0xa0, 0x1c, 0x7b, 0xec, - 0x06, 0x8c, 0x8c, 0x0f, 0x08, 0xb9, 0x2d, 0xdc, 0x04, 0x0d, 0x37, 0xd7, 0xc1, 0x0d, 0xe3, 0x00, - 0x73, 0xdb, 0xf8, 0x85, 0x53, 0xca, 0xed, 0x50, 0x10, 0x2e, 0x92, 0xc0, 0xed, 0xa2, 0x20, 0x75, - 0x3b, 0x37, 0xdc, 0x78, 0x0a, 0x4b, 0xf4, 0xb9, 0x8f, 0x87, 0xb8, 0x62, 0xf4, 0x19, 0xc4, 0x6d, - 0x61, 0x1c, 0xd1, 0xba, 0x96, 0x76, 0x30, 0x1e, 0xb4, 0x1d, 0x4d, 0xb7, 0xd1, 0x05, 0x1d, 0x1b, - 0x0d, 0x74, 0x36, 0x12, 0x18, 0x94, 0x6d, 0x8a, 0xdd, 0x82, 0x1b, 0x65, 0x8c, 0x0c, 0xa4, 0x15, - 0xee, 0x0a, 0xa5, 0x01, 0xd7, 0x21, 0xfd, 0x15, 0xd9, 0xc6, 0xb0, 0xfa, 0x62, 0x9e, 0x9e, 0x09, - 0x83, 0x74, 0xd0, 0x22, 0x4e, 0x45, 0x43, 0x77, 0x71, 0x01, 0xd2, 0xf4, 0x77, 0xc5, 0xed, 0x6d, - 0x7c, 0x01, 0x83, 0xb2, 0x14, 0x2d, 0x7b, 0x25, 0x54, 0xd9, 0xd3, 0x80, 0xeb, 0xd4, 0x06, 0x0c, - 0xd2, 0xde, 0x78, 0x49, 0x23, 0x0c, 0x33, 0xd9, 0x0a, 0x80, 0x41, 0x4c, 0xe6, 0x9c, 0x46, 0x99, - 0x39, 0x57, 0x91, 0xc5, 0x3c, 0xa8, 0x72, 0xe7, 0x4c, 0xe4, 0xca, 0xed, 0xe0, 0xf7, 0x93, 0xe4, - 0x57, 0x22, 0xc0, 0xe4, 0xc1, 0x68, 0x47, 0x52, 0xb9, 0xbd, 0x8d, 0x3d, 0x18, 0xbd, 0x44, 0x55, - 0x5c, 0x91, 0xee, 0x3b, 0xc0, 0x4a, 0xe7, 0x6a, 0xd4, 0x6d, 0xa1, 0x4d, 0x4a, 0xbc, 0x0a, 0x75, - 0x1d, 0x76, 0x13, 0x96, 0x31, 0xe8, 0x35, 0xd4, 0xde, 0xfa, 0xaa, 0x03, 0x7d, 0x5d, 0x01, 0xec, - 0x0b, 0x18, 0x59, 0x3f, 0x01, 0xd9, 0x1d, 0xac, 0xc5, 0xab, 0xbf, 0x2c, 0x57, 0xbf, 0x76, 0x05, - 0xd7, 0x65, 0xe3, 0xb5, 0xd8, 0x8f, 0x01, 0xea, 0x89, 0xc7, 0x3e, 0xa2, 0x6b, 0xc0, 0xe5, 0x09, - 0xb8, 0x3a, 0xa1, 0xbb, 0xd2, 0x82, 0x1f, 0x9c, 0x5e, 0x8b, 0xfd, 0x0c, 0x96, 0x4d, 0x73, 0xd2, - 0xe7, 0xc2, 0xa6, 0x56, 0xbf, 0x5a, 0x30, 0xcb, 0xde, 0xa9, 0xec, 0xcb, 0x4a, 0x99, 0x3e, 0x13, - 0x36, 0x59, 0xd0, 0xfc, 0xb4, 0x9a, 0xaf, 0x5f, 0xdb, 0x16, 0xbd, 0x16, 0xdb, 0x85, 0x91, 0x6e, - 0x5e, 0xfa, 0x6a, 0x72, 0x17, 0x65, 0xaf, 0xeb, 0x66, 0xef, 0x74, 0x68, 0x1b, 0xc6, 0x76, 0xbf, - 0x61, 0x14, 0xc9, 0x05, 0x8d, 0x49, 0x2b, 0x59, 0xd4, 0x9a, 0xbc, 0xd6, 0xc3, 0xc9, 0xdf, 0xdf, - 0x4c, 0x9d, 0xd7, 0x6f, 0xa6, 0xce, 0xbf, 0xdf, 0x4c, 0x9d, 0xdf, 0xbd, 0x9d, 0xb6, 0x5e, 0xbf, - 0x9d, 0xb6, 0xfe, 0xf5, 0x76, 0xda, 0x3a, 0xea, 0xd3, 0xcf, 0xe6, 0xef, 0xfd, 0x37, 0x00, 0x00, - 0xff, 0xff, 0xf3, 0x6f, 0xde, 0xe1, 0x7e, 0x16, 0x00, 0x00, + // 2162 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x58, 0xcf, 0x73, 0xdc, 0x4a, + 0xf1, 0x5f, 0xed, 0x2f, 0xef, 0xf6, 0xae, 0x1d, 0x65, 0xf2, 0xe3, 0xbb, 0x5f, 0x13, 0x16, 0x97, + 0xf2, 0x2a, 0x18, 0x17, 0xe5, 0x7a, 0x31, 0x8f, 0x7a, 0xd4, 0xab, 0x02, 0x1e, 0xb1, 0xf3, 0x9c, + 0x80, 0x83, 0x13, 0xd9, 0x09, 0xe7, 0xb1, 0x34, 0x5e, 0x0b, 0x6b, 0x25, 0x45, 0x33, 0xb2, 0xcb, + 0x07, 0x8a, 0x3f, 0x01, 0x2e, 0x1c, 0xa0, 0xb8, 0x72, 0x7d, 0x47, 0xfe, 0x04, 0xe0, 0x98, 0xa2, + 0x8a, 0x2a, 0x8e, 0x54, 0xf2, 0x6f, 0x70, 0xa0, 0xba, 0x67, 0x24, 0x8d, 0xec, 0x75, 0x42, 0x0e, + 0xdc, 0xd4, 0x9f, 0xee, 0xe9, 0xee, 0xe9, 0xe9, 0x1f, 0x33, 0x82, 0x95, 0x70, 0x7e, 0x9e, 0xe6, + 0xa7, 0x22, 0xdf, 0xcc, 0xf2, 0x54, 0xa5, 0xac, 0x9d, 0x1d, 0x79, 0xeb, 0xc0, 0x5e, 0x14, 0x22, + 0xbf, 0x38, 0x50, 0x5c, 0x15, 0xd2, 0x17, 0xaf, 0x0b, 0x21, 0x15, 0x63, 0xd0, 0x4d, 0xf8, 0x5c, + 0x4c, 0x9c, 0x35, 0x67, 0x7d, 0xe8, 0xd3, 0xb7, 0x97, 0xc1, 0xed, 0xed, 0x74, 0x3e, 0x4f, 0x93, + 0x5f, 0x90, 0x0e, 0x5f, 0xc8, 0x2c, 0x4d, 0xa4, 0x60, 0x77, 0xa1, 0x9f, 0x0b, 0x59, 0xc4, 0x8a, + 0xa4, 0x07, 0xbe, 0xa1, 0x98, 0x0b, 0x9d, 0xb9, 0x9c, 0x4d, 0xda, 0xa4, 0x02, 0x3f, 0x51, 0x52, + 0xa6, 0x45, 0x1e, 0x88, 0x49, 0x87, 0x40, 0x43, 0x21, 0xae, 0xfd, 0x9a, 0x74, 0x35, 0xae, 0x29, + 0xef, 0x6b, 0x07, 0x6e, 0x35, 0x9c, 0xfb, 0x68, 0x8b, 0x9f, 0xc1, 0x58, 0xdb, 0xd0, 0x1a, 0xc8, + 0xee, 0x68, 0xcb, 0xdd, 0xcc, 0x8e, 0x36, 0x0f, 0x2c, 0xdc, 0x6f, 0x48, 0xb1, 0xcf, 0x61, 0x59, + 0x16, 0x47, 0x87, 0x5c, 0x9e, 0x9a, 0x65, 0xdd, 0xb5, 0xce, 0xfa, 0x68, 0xeb, 0x26, 0x2d, 0xb3, + 0x19, 0x7e, 0x53, 0xce, 0xfb, 0x93, 0x03, 0xa3, 0xed, 0x13, 0x11, 0x18, 0x1a, 0x1d, 0xcd, 0xb8, + 0x94, 0x22, 0x2c, 0x1d, 0xd5, 0x14, 0xbb, 0x0d, 0x3d, 0x95, 0x2a, 0x1e, 0x93, 0xab, 0x3d, 0x5f, + 0x13, 0x6c, 0x0a, 0x20, 0x8b, 0x20, 0x10, 0x52, 0x1e, 0x17, 0x31, 0xb9, 0xda, 0xf3, 0x2d, 0x04, + 0xb5, 0x1d, 0xf3, 0x28, 0x16, 0x21, 0x85, 0xa9, 0xe7, 0x1b, 0x8a, 0x4d, 0x60, 0xe9, 0x9c, 0xe7, + 0x49, 0x94, 0xcc, 0x26, 0x3d, 0x62, 0x94, 0x24, 0xae, 0x08, 0x85, 0xe2, 0x51, 0x3c, 0xe9, 0xaf, + 0x39, 0xeb, 0x63, 0xdf, 0x50, 0xde, 0x1b, 0x07, 0x60, 0xa7, 0x98, 0x67, 0xc6, 0xcd, 0x35, 0x18, + 0x91, 0x07, 0x87, 0xfc, 0x28, 0x16, 0x92, 0x7c, 0xed, 0xf8, 0x36, 0xc4, 0xd6, 0xe1, 0x46, 0x90, + 0xce, 0xb3, 0x58, 0x28, 0x11, 0x1a, 0x29, 0x74, 0xdd, 0xf1, 0x2f, 0xc3, 0xec, 0x13, 0x58, 0x3e, + 0x8e, 0x92, 0x48, 0x9e, 0x88, 0xf0, 0xd1, 0x85, 0x12, 0x3a, 0xe4, 0x8e, 0xdf, 0x04, 0x99, 0x07, + 0xe3, 0x12, 0xf0, 0xd3, 0x73, 0x49, 0x1b, 0x72, 0xfc, 0x06, 0xc6, 0xbe, 0x0b, 0x37, 0x85, 0x54, + 0xd1, 0x9c, 0x2b, 0x71, 0x88, 0xae, 0x90, 0x60, 0x8f, 0x04, 0xaf, 0x32, 0xbc, 0x3f, 0x3b, 0x00, + 0x7b, 0x29, 0x0f, 0xcd, 0x96, 0xae, 0xb8, 0xa1, 0x37, 0x75, 0xc9, 0x8d, 0x29, 0x00, 0xed, 0x52, + 0x8b, 0xb4, 0x49, 0xc4, 0x42, 0xd8, 0x2a, 0x0c, 0xb2, 0x3c, 0x9d, 0xe5, 0x42, 0x4a, 0x93, 0xb2, + 0x15, 0x8d, 0x6b, 0xe7, 0x42, 0xf1, 0x47, 0x51, 0x12, 0xa7, 0x33, 0x93, 0xb8, 0x16, 0xc2, 0x1e, + 0xc0, 0x4a, 0x4d, 0xed, 0x1e, 0x3e, 0xdd, 0x21, 0xdf, 0x87, 0xfe, 0x25, 0xd4, 0xfb, 0x9d, 0x03, + 0xcb, 0x07, 0x27, 0x3c, 0x0f, 0xa3, 0x64, 0xb6, 0x9b, 0xa7, 0x45, 0x86, 0xa7, 0xa6, 0x78, 0x3e, + 0x13, 0xca, 0x94, 0x9f, 0xa1, 0xb0, 0x28, 0x77, 0x76, 0xf6, 0xd0, 0xcf, 0x0e, 0x16, 0x25, 0x7e, + 0xeb, 0x7d, 0xe6, 0x52, 0xed, 0xa5, 0x01, 0x57, 0x51, 0x9a, 0x18, 0x37, 0x9b, 0x20, 0x15, 0xde, + 0x45, 0x12, 0x50, 0xe6, 0x74, 0xa8, 0xf0, 0x88, 0xc2, 0xfd, 0x15, 0x89, 0xe1, 0xf4, 0x88, 0x53, + 0xd1, 0xde, 0x3f, 0x3a, 0x00, 0x07, 0x17, 0x49, 0x70, 0x29, 0x47, 0x1e, 0x9f, 0x89, 0x44, 0x35, + 0x73, 0x44, 0x43, 0xa8, 0x4c, 0xa7, 0x4c, 0x56, 0x86, 0xb2, 0xa2, 0xd9, 0x3d, 0x18, 0xe6, 0x22, + 0x10, 0x89, 0x42, 0x66, 0x87, 0x98, 0x35, 0x80, 0xd9, 0x30, 0xe7, 0x52, 0x89, 0xbc, 0x11, 0xcc, + 0x06, 0xc6, 0x36, 0xc0, 0xb5, 0xe9, 0x5d, 0x15, 0x85, 0x26, 0xa0, 0x57, 0x70, 0xd4, 0x47, 0x9b, + 0x28, 0xf5, 0xf5, 0xb5, 0x3e, 0x1b, 0x43, 0x7d, 0x36, 0x4d, 0xfa, 0x96, 0xb4, 0xbe, 0xcb, 0x38, + 0xea, 0x3b, 0x8a, 0xd3, 0xe0, 0x34, 0x4a, 0x66, 0x74, 0x00, 0x03, 0x0a, 0x55, 0x03, 0x63, 0x3f, + 0x04, 0xb7, 0x48, 0x72, 0x21, 0xd3, 0xf8, 0x4c, 0x84, 0x74, 0x8e, 0x72, 0x32, 0xb4, 0xda, 0x86, + 0x7d, 0xc2, 0xfe, 0x15, 0x51, 0xeb, 0x84, 0x40, 0x77, 0x0a, 0x73, 0x42, 0x53, 0x80, 0x23, 0x72, + 0xe4, 0xf0, 0x22, 0x13, 0x93, 0x91, 0xce, 0xb2, 0x1a, 0x61, 0x9f, 0xc2, 0x2d, 0x29, 0x82, 0x34, + 0x09, 0xe5, 0x23, 0x71, 0x12, 0x25, 0xe1, 0x33, 0x8a, 0xc5, 0x64, 0x4c, 0x21, 0x5e, 0xc4, 0xf2, + 0xfe, 0xe8, 0xc0, 0xd8, 0xee, 0x7d, 0x56, 0x57, 0x76, 0xae, 0xe9, 0xca, 0x6d, 0xbb, 0x2b, 0xb3, + 0xef, 0x54, 0xdd, 0x57, 0x77, 0x53, 0xda, 0xdf, 0xf3, 0x3c, 0xc5, 0x36, 0xe5, 0x13, 0xa3, 0x6a, + 0xc8, 0x0f, 0x61, 0x94, 0x8b, 0x98, 0x5f, 0x54, 0x6d, 0x14, 0xe5, 0x6f, 0xa0, 0xbc, 0x5f, 0xc3, + 0xbe, 0x2d, 0xe3, 0xfd, 0xb5, 0x0d, 0x23, 0x8b, 0x79, 0x25, 0x37, 0x9c, 0xff, 0x32, 0x37, 0xda, + 0xd7, 0xe4, 0xc6, 0x5a, 0xe9, 0x52, 0x71, 0xb4, 0x13, 0xe5, 0xa6, 0x5c, 0x6c, 0xa8, 0x92, 0x68, + 0x24, 0xa3, 0x0d, 0x61, 0x37, 0xb4, 0x48, 0x2b, 0x15, 0x2f, 0xc3, 0x6c, 0x13, 0x18, 0x41, 0xdb, + 0x5c, 0x05, 0x27, 0x2f, 0x33, 0x73, 0x3a, 0x7d, 0x3a, 0xe2, 0x05, 0x1c, 0xf6, 0x2d, 0xe8, 0x49, + 0xc5, 0x67, 0x82, 0x52, 0x71, 0x65, 0x6b, 0x48, 0xa9, 0x83, 0x80, 0xaf, 0x71, 0x2b, 0xf8, 0x83, + 0x0f, 0x04, 0xdf, 0xfb, 0x77, 0x1b, 0x96, 0x1b, 0xd3, 0x6a, 0xd1, 0x54, 0xaf, 0x2d, 0xb6, 0xaf, + 0xb1, 0xb8, 0x06, 0xdd, 0x22, 0x89, 0xf4, 0x61, 0xaf, 0x6c, 0x8d, 0x91, 0xff, 0x32, 0x89, 0x14, + 0x66, 0x9f, 0x4f, 0x1c, 0xcb, 0xa7, 0xee, 0x87, 0x12, 0xe2, 0x53, 0xb8, 0x55, 0xa7, 0xfe, 0xce, + 0xce, 0xde, 0x5e, 0x1a, 0x9c, 0x56, 0x9d, 0x71, 0x11, 0x8b, 0x31, 0x3d, 0xd3, 0xa9, 0x84, 0x9f, + 0xb4, 0xf4, 0x54, 0xff, 0x36, 0xf4, 0x02, 0x9c, 0xb2, 0x14, 0x25, 0x93, 0x50, 0xd6, 0xd8, 0x7d, + 0xd2, 0xf2, 0x35, 0x9f, 0x7d, 0x02, 0xdd, 0xb0, 0x98, 0x67, 0x26, 0x56, 0x2b, 0x28, 0x57, 0x8f, + 0xbd, 0x27, 0x2d, 0x9f, 0xb8, 0x28, 0x15, 0xa7, 0x3c, 0x9c, 0x0c, 0x6b, 0xa9, 0x7a, 0x92, 0xa0, + 0x14, 0x72, 0x51, 0x0a, 0x6b, 0x92, 0xea, 0xd3, 0x48, 0xd5, 0xed, 0x11, 0xa5, 0x90, 0xfb, 0x68, + 0x00, 0x7d, 0xa9, 0x13, 0xf9, 0x47, 0x70, 0xb3, 0x11, 0xfd, 0xbd, 0x48, 0x52, 0xa8, 0x34, 0x7b, + 0xe2, 0x5c, 0x77, 0xa5, 0x28, 0xd7, 0x4f, 0x01, 0x68, 0x4f, 0x8f, 0xf3, 0x3c, 0xcd, 0xcb, 0xab, + 0x8d, 0x53, 0x5d, 0x6d, 0xbc, 0x6f, 0xc2, 0x10, 0xf7, 0xf2, 0x1e, 0x36, 0x6e, 0xe2, 0x3a, 0x76, + 0x06, 0x63, 0xf2, 0xfe, 0xc5, 0xde, 0x35, 0x12, 0x6c, 0x0b, 0x6e, 0xeb, 0xfb, 0x85, 0x4e, 0xe7, + 0xe7, 0xa9, 0x8c, 0x68, 0xc0, 0xe8, 0xc2, 0x5a, 0xc8, 0xc3, 0x11, 0x20, 0x50, 0xdd, 0xc1, 0x8b, + 0xbd, 0x72, 0x5e, 0x96, 0xb4, 0xf7, 0x7d, 0x18, 0xa2, 0x45, 0x6d, 0x6e, 0x1d, 0xfa, 0xc4, 0x28, + 0xe3, 0xe0, 0x56, 0xe1, 0x34, 0x0e, 0xf9, 0x86, 0xef, 0xfd, 0xc6, 0x81, 0x91, 0x6e, 0x57, 0x7a, + 0xe5, 0xc7, 0x76, 0xab, 0xb5, 0xc6, 0xf2, 0xb2, 0xde, 0x6d, 0x8d, 0x9b, 0x00, 0xd4, 0x70, 0xb4, + 0x40, 0xb7, 0x3e, 0xde, 0x1a, 0xf5, 0x2d, 0x09, 0x3c, 0x98, 0x9a, 0x5a, 0x10, 0xda, 0xdf, 0xb7, + 0x61, 0x6c, 0x8e, 0x54, 0x8b, 0xfc, 0x8f, 0xca, 0xce, 0x54, 0x46, 0xd7, 0xae, 0x8c, 0x07, 0x65, + 0x65, 0xf4, 0xea, 0x6d, 0xd4, 0x59, 0x54, 0x17, 0xc6, 0x7d, 0x53, 0x18, 0x7d, 0x12, 0x5b, 0x2e, + 0x0b, 0xa3, 0x94, 0xd2, 0x75, 0x71, 0xdf, 0xd4, 0xc5, 0x52, 0x2d, 0x54, 0xa5, 0x54, 0x55, 0x16, + 0xf7, 0x4d, 0x59, 0x0c, 0x6a, 0xa1, 0xea, 0x98, 0xab, 0xaa, 0x58, 0x82, 0x1e, 0x1d, 0xa7, 0xf7, + 0x05, 0xb8, 0x76, 0x68, 0xa8, 0x26, 0x1e, 0x18, 0x66, 0x23, 0x15, 0x2c, 0x21, 0xdf, 0xac, 0x7d, + 0x0d, 0xcb, 0x8d, 0xa6, 0x82, 0xb3, 0x31, 0x92, 0xdb, 0x3c, 0x09, 0x44, 0x5c, 0xdd, 0xb0, 0x2d, + 0xc4, 0x4a, 0xb2, 0x76, 0xad, 0xd9, 0xa8, 0x68, 0x24, 0x99, 0x75, 0x4f, 0xee, 0x34, 0xee, 0xc9, + 0x7f, 0x77, 0x60, 0x6c, 0x2f, 0xc0, 0xab, 0xf6, 0xe3, 0x3c, 0xdf, 0x4e, 0x43, 0x7d, 0x9a, 0x3d, + 0xbf, 0x24, 0x31, 0xf5, 0xf1, 0x33, 0xe6, 0x52, 0x9a, 0x0c, 0xac, 0x68, 0xc3, 0x3b, 0x08, 0xd2, + 0xac, 0x7c, 0xf9, 0x54, 0xb4, 0xe1, 0xed, 0x89, 0x33, 0x11, 0x9b, 0x51, 0x53, 0xd1, 0x68, 0xed, + 0x99, 0x90, 0x12, 0xd3, 0x44, 0x77, 0xc8, 0x92, 0xc4, 0x55, 0x3e, 0x3f, 0xdf, 0xe6, 0x85, 0x14, + 0xe6, 0x76, 0x53, 0xd1, 0x18, 0x16, 0x7c, 0xa1, 0xf1, 0x3c, 0x2d, 0x92, 0xf2, 0x4e, 0x63, 0x21, + 0xde, 0x39, 0xdc, 0x7c, 0x5e, 0xe4, 0x33, 0x41, 0x49, 0x5c, 0x3e, 0xf8, 0x56, 0x61, 0x10, 0x25, + 0x3c, 0x50, 0xd1, 0x99, 0x30, 0x91, 0xac, 0x68, 0xcc, 0x5f, 0x15, 0xcd, 0x85, 0xb9, 0xd4, 0xd1, + 0x37, 0xca, 0x1f, 0x47, 0xb1, 0xa0, 0xbc, 0x36, 0x5b, 0x2a, 0x69, 0x2a, 0x51, 0x3d, 0x5d, 0xcd, + 0x73, 0x4e, 0x53, 0xde, 0x1f, 0xda, 0xb0, 0xba, 0x9f, 0x89, 0x9c, 0x2b, 0xa1, 0x9f, 0x90, 0x07, + 0xc1, 0x89, 0x98, 0xf3, 0xd2, 0x85, 0x7b, 0xd0, 0x4e, 0x33, 0x32, 0x6e, 0xf2, 0x5d, 0xb3, 0xf7, + 0x33, 0xbf, 0x9d, 0x66, 0xe4, 0x04, 0x97, 0xa7, 0x26, 0xb6, 0xf4, 0x7d, 0xed, 0x7b, 0x72, 0x15, + 0x06, 0x21, 0x57, 0xfc, 0x88, 0x4b, 0x51, 0xc6, 0xb4, 0xa4, 0xe9, 0xe9, 0x85, 0x2f, 0x15, 0x13, + 0x51, 0x4d, 0x90, 0x26, 0xb2, 0x66, 0xa2, 0x69, 0x28, 0x94, 0x3e, 0x8e, 0x0b, 0x79, 0x42, 0x61, + 0x1c, 0xf8, 0x9a, 0x40, 0x5f, 0xaa, 0x9c, 0x1f, 0xe8, 0x14, 0xc7, 0xa8, 0x1f, 0xe7, 0xe9, 0x5c, + 0x37, 0x16, 0x1a, 0x25, 0x03, 0xdf, 0x42, 0x4a, 0xfe, 0xa1, 0xbe, 0xd8, 0x43, 0xcd, 0xd7, 0x88, + 0xa7, 0x60, 0xf9, 0xd5, 0x43, 0x93, 0xf6, 0xcf, 0x84, 0xe2, 0x6c, 0xd5, 0x0a, 0x07, 0x60, 0x38, + 0x90, 0x63, 0x82, 0xf1, 0xc1, 0xee, 0x51, 0xb6, 0x9c, 0x8e, 0xd5, 0x72, 0xca, 0x08, 0x76, 0x29, + 0xc5, 0xe9, 0xdb, 0xfb, 0x0c, 0x6e, 0x9b, 0x13, 0x79, 0xf5, 0x10, 0xad, 0x5e, 0x7b, 0x16, 0x9a, + 0xad, 0xcd, 0x7b, 0x7f, 0x71, 0xe0, 0xce, 0xa5, 0x65, 0x1f, 0xfd, 0x32, 0xff, 0x1c, 0xba, 0xf8, + 0x10, 0x9a, 0x74, 0xa8, 0x34, 0xef, 0xa3, 0x8d, 0x85, 0x2a, 0x37, 0x91, 0x78, 0x9c, 0xa8, 0xfc, + 0xc2, 0xa7, 0x05, 0xab, 0x3f, 0x85, 0x61, 0x05, 0xa1, 0xde, 0x53, 0x71, 0x51, 0x76, 0xdf, 0x53, + 0x71, 0x81, 0x77, 0x83, 0x33, 0x1e, 0x17, 0x3a, 0x34, 0x66, 0xc0, 0x36, 0x02, 0xeb, 0x6b, 0xfe, + 0x17, 0xed, 0x1f, 0x38, 0xde, 0xaf, 0x60, 0xf2, 0x84, 0x27, 0x61, 0x6c, 0xf2, 0x51, 0x37, 0x05, + 0x13, 0x82, 0x6f, 0x58, 0x21, 0x18, 0xa1, 0x16, 0xe2, 0xbe, 0x27, 0x1b, 0xef, 0xc1, 0xf0, 0xa8, + 0x1c, 0x87, 0x26, 0xf0, 0x35, 0x40, 0x39, 0xf3, 0x3a, 0x96, 0xe6, 0x01, 0x46, 0xdf, 0xde, 0x1d, + 0xb8, 0xb5, 0x2b, 0x94, 0xb6, 0xbd, 0x7d, 0x3c, 0x33, 0x96, 0xbd, 0x75, 0xb8, 0xdd, 0x84, 0x4d, + 0x70, 0x5d, 0xe8, 0x04, 0xc7, 0xd5, 0xa8, 0x09, 0x8e, 0x67, 0x1b, 0xa7, 0xd0, 0xd7, 0x59, 0xc1, + 0x96, 0x61, 0xf8, 0x34, 0x39, 0xe3, 0x71, 0x14, 0xee, 0x67, 0x6e, 0x8b, 0x0d, 0xa0, 0x7b, 0xa0, + 0xd2, 0xcc, 0x75, 0xd8, 0x10, 0x7a, 0xcf, 0xb1, 0x2d, 0xb8, 0x6d, 0x06, 0xd0, 0xc7, 0xce, 0x39, + 0x17, 0x6e, 0x07, 0xe1, 0x03, 0xc5, 0x73, 0xe5, 0x76, 0x11, 0x7e, 0x99, 0x85, 0x5c, 0x09, 0xb7, + 0xc7, 0x56, 0x00, 0x7e, 0x52, 0xa8, 0xd4, 0x88, 0xf5, 0x91, 0xb7, 0x23, 0xf0, 0x79, 0xef, 0x2e, + 0x6d, 0xfc, 0x9a, 0x96, 0xcc, 0xd0, 0x8f, 0xb1, 0xb1, 0x45, 0xb4, 0xdb, 0x62, 0x4b, 0xd0, 0xf9, + 0xb9, 0x38, 0x77, 0x1d, 0x36, 0x82, 0x25, 0xbf, 0x48, 0x92, 0x28, 0x99, 0x69, 0x7b, 0x64, 0x3a, + 0x74, 0x3b, 0xc8, 0x40, 0x87, 0x32, 0x11, 0xba, 0x5d, 0x36, 0x86, 0xc1, 0x57, 0xe6, 0x1d, 0xee, + 0xf6, 0x90, 0x85, 0x62, 0xb8, 0xa6, 0x8f, 0x2c, 0x32, 0x8e, 0xd4, 0x12, 0x52, 0xb4, 0x0a, 0xa9, + 0xc1, 0xc6, 0x3e, 0x0c, 0xca, 0x11, 0xc8, 0x6e, 0xc0, 0xc8, 0xf8, 0x80, 0x90, 0xdb, 0xc2, 0x0d, + 0xd1, 0xa0, 0x73, 0x1d, 0xdc, 0x3c, 0x0e, 0x33, 0xb7, 0x8d, 0x5f, 0x38, 0xb1, 0xdc, 0x0e, 0x05, + 0xe4, 0x22, 0x09, 0xdc, 0x2e, 0x0a, 0x52, 0xe7, 0x73, 0xc3, 0x8d, 0x67, 0xb0, 0x44, 0x9f, 0xfb, + 0x78, 0xa0, 0x2b, 0x46, 0x9f, 0x41, 0xdc, 0x16, 0xc6, 0x14, 0xad, 0x6b, 0x69, 0x07, 0x63, 0x43, + 0xdb, 0xd1, 0x74, 0x1b, 0x5d, 0xd0, 0x71, 0xd2, 0x40, 0x67, 0x23, 0x81, 0x41, 0xd9, 0xb2, 0xd8, + 0x2d, 0xb8, 0x51, 0xc6, 0xc8, 0x40, 0x5a, 0xe1, 0xae, 0x50, 0x1a, 0x70, 0x1d, 0xd2, 0x5f, 0x91, + 0x6d, 0x0c, 0xab, 0x2f, 0xe6, 0xe9, 0x99, 0x30, 0x48, 0x07, 0x2d, 0xe2, 0x84, 0x34, 0x74, 0x17, + 0x17, 0x20, 0x4d, 0x7f, 0x5a, 0xdc, 0xde, 0xc6, 0x97, 0x30, 0x28, 0xcb, 0xd2, 0xb2, 0x57, 0x42, + 0x95, 0x3d, 0x0d, 0xb8, 0x4e, 0x6d, 0xc0, 0x20, 0xed, 0x8d, 0x57, 0x34, 0xce, 0x30, 0xab, 0xad, + 0x00, 0x18, 0xc4, 0x64, 0xd1, 0x69, 0x94, 0x99, 0x73, 0x15, 0x59, 0xcc, 0x83, 0x2a, 0x8f, 0xce, + 0x44, 0xae, 0xdc, 0x0e, 0x7e, 0x3f, 0x4d, 0x7e, 0x29, 0x02, 0x4c, 0x24, 0x8c, 0x76, 0x24, 0x95, + 0xdb, 0xdb, 0xd8, 0x83, 0xd1, 0x2b, 0x54, 0xc5, 0x15, 0xe9, 0xbe, 0x0b, 0xac, 0x74, 0xae, 0x46, + 0xdd, 0x16, 0xda, 0xa4, 0x24, 0xac, 0x50, 0xd7, 0x61, 0x37, 0x61, 0x19, 0x83, 0x5e, 0x43, 0xed, + 0xad, 0xaf, 0x3b, 0xd0, 0xd7, 0xd5, 0xc0, 0xbe, 0x84, 0x91, 0xf5, 0x43, 0x90, 0xdd, 0xc5, 0xba, + 0xbc, 0xfa, 0xfb, 0x72, 0xf5, 0xff, 0xae, 0xe0, 0xba, 0x84, 0xbc, 0x16, 0xfb, 0x31, 0x40, 0x3d, + 0xfd, 0xd8, 0x1d, 0xba, 0x12, 0x5c, 0x9e, 0x86, 0xab, 0x13, 0xba, 0x37, 0x2d, 0xf8, 0xd9, 0xe9, + 0xb5, 0xd8, 0xcf, 0x60, 0xd9, 0x34, 0x2a, 0x7d, 0x2e, 0x6c, 0x6a, 0xf5, 0xae, 0x05, 0x73, 0xed, + 0xbd, 0xca, 0xbe, 0xaa, 0x94, 0xe9, 0x33, 0x61, 0x93, 0x05, 0x8d, 0x50, 0xab, 0xf9, 0xff, 0x6b, + 0x5b, 0xa4, 0xd7, 0x62, 0xbb, 0x30, 0xd2, 0x8d, 0x4c, 0x5f, 0x53, 0xee, 0xa1, 0xec, 0x75, 0x9d, + 0xed, 0xbd, 0x0e, 0x6d, 0xc3, 0xd8, 0xee, 0x3d, 0x8c, 0x22, 0xb9, 0xa0, 0x49, 0x69, 0x25, 0x8b, + 0xda, 0x94, 0xd7, 0x7a, 0x34, 0xf9, 0xdb, 0xdb, 0xa9, 0xf3, 0xe6, 0xed, 0xd4, 0xf9, 0xd7, 0xdb, + 0xa9, 0xf3, 0xdb, 0x77, 0xd3, 0xd6, 0x9b, 0x77, 0xd3, 0xd6, 0x3f, 0xdf, 0x4d, 0x5b, 0x47, 0x7d, + 0xfa, 0xf1, 0xfc, 0xbd, 0xff, 0x04, 0x00, 0x00, 0xff, 0xff, 0xfc, 0x63, 0x12, 0x81, 0x8a, 0x16, + 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/dm/dm/proto/dmworker.proto b/dm/dm/proto/dmworker.proto index 4f54b1d76cf..3c9f43cf439 100644 --- a/dm/dm/proto/dmworker.proto +++ b/dm/dm/proto/dmworker.proto @@ -29,6 +29,7 @@ enum TaskOp { Start = 4; Update = 5; AutoResume = 6; + Delete = 7; } message QueryStatusRequest { diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go index 487d5361bcc..c6ec91c9a30 100644 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -476,7 +476,7 @@ func (w *SourceWorker) EnableHandleSubtasks() error { if s, ok := validatorStages[subTaskCfg.Name]; ok { validatorStage = s.Expect } - w.l.Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) + w.l.Info("start to create subtask in EnableHandleSubtasks", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) // "for range" of a map will use same value address, so we'd better not pass value address to other function clone := subTaskCfg if err2 := w.StartSubTask(&clone, expectStage.Expect, validatorStage, false); err2 != nil { @@ -542,6 +542,7 @@ func (w *SourceWorker) fetchSubTasksAndAdjust() (map[string]ha.Stage, map[string } // StartSubTask creates a subtask and run it. +// TODO(ehco) rename this func. func (w *SourceWorker) StartSubTask(cfg *config.SubTaskConfig, expectStage, validatorStage pb.Stage, needLock bool) error { if needLock { w.Lock() @@ -623,25 +624,27 @@ func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error { return terror.ErrWorkerSubTaskNotFound.Generate(name) } + w.l.Info("OperateSubTask start", zap.Stringer("op", op), zap.String("task", name)) var err error switch op { - case pb.TaskOp_Stop: - w.l.Info("stop sub task", zap.String("task", name)) + case pb.TaskOp_Delete: + w.l.Info("delete subtask", zap.String("task", name)) st.Close() w.subTaskHolder.removeSubTask(name) - case pb.TaskOp_Pause: - w.l.Info("pause sub task", zap.String("task", name)) + case pb.TaskOp_Pause, pb.TaskOp_Stop: + w.l.Info("pause subtask", zap.String("task", name)) err = st.Pause() case pb.TaskOp_Resume: - w.l.Info("resume sub task", zap.String("task", name)) + w.l.Info("resume subtask", zap.String("task", name)) err = st.Resume(w.getRelayWithoutLock()) case pb.TaskOp_AutoResume: - w.l.Info("auto_resume sub task", zap.String("task", name)) + // TODO(ehco) change to auto_restart + w.l.Info("auto_resume subtask", zap.String("task", name)) err = st.Resume(w.getRelayWithoutLock()) default: err = terror.ErrWorkerUpdateTaskStage.Generatef("invalid operate %s on subtask %v", op, name) } - + w.l.Info("OperateSubTask finished", zap.Stringer("op", op), zap.String("task", name)) return err } @@ -701,9 +704,9 @@ func (w *SourceWorker) resetSubtaskStage() (int64, error) { } // remove subtasks without subtask config or subtask stage for name := range sts { - err = w.OperateSubTask(name, pb.TaskOp_Stop) + err = w.OperateSubTask(name, pb.TaskOp_Delete) if err != nil { - opErrCounter.WithLabelValues(w.name, pb.TaskOp_Stop.String()).Inc() + opErrCounter.WithLabelValues(w.name, pb.TaskOp_Delete.String()).Inc() log.L().Error("fail to stop subtask", zap.String("task", name), zap.Error(err)) } } @@ -798,27 +801,36 @@ func (w *SourceWorker) handleSubTaskStage(ctx context.Context, stageCh chan ha.S // operateSubTaskStage returns TaskOp.String() additionally to record metrics. func (w *SourceWorker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.SubTaskConfig) (string, error) { var op pb.TaskOp - switch { - case stage.Expect == pb.Stage_Running, stage.Expect == pb.Stage_Paused: - if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { - // create the subtask for expected running and paused stage. - log.L().Info("start to create subtask", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) - + log.L().Info("operateSubTaskStage", + zap.String("sourceID", subTaskCfg.SourceID), + zap.String("task", subTaskCfg.Name), + zap.Stringer("stage", stage)) + + // for new added subtask + if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { + switch stage.Expect { + case pb.Stage_Running, pb.Stage_Paused, pb.Stage_Stopped: + // todo refactor here deciding if the expected stage is valid should be put inside StartSubTask and OperateSubTask + log.L().Info("start to create subtask in operateSubTaskStage", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) expectValidatorStage, err := getExpectValidatorStage(subTaskCfg.ValidatorCfg, w.etcdClient, stage.Source, stage.Task, stage.Revision) if err != nil { return opErrTypeBeforeOp, terror.Annotate(err, "fail to get validator stage from etcd") } - - err = w.StartSubTask(&subTaskCfg, stage.Expect, expectValidatorStage, true) - return opErrTypeBeforeOp, err - } - if stage.Expect == pb.Stage_Running { - op = pb.TaskOp_Resume - } else if stage.Expect == pb.Stage_Paused { - op = pb.TaskOp_Pause + return opErrTypeBeforeOp, w.StartSubTask(&subTaskCfg, stage.Expect, expectValidatorStage, true) + default: + // not valid stage + return op.String(), w.OperateSubTask(stage.Task, op) } - case stage.IsDeleted: - op = pb.TaskOp_Stop + } + // todo(ehco) remove pause and resume after using openapi to impl dmctl + switch stage.Expect { + case pb.Stage_Stopped, pb.Stage_Paused: + op = pb.TaskOp_Pause + case pb.Stage_Running: + op = pb.TaskOp_Resume + } + if stage.IsDeleted { + op = pb.TaskOp_Delete } return op.String(), w.OperateSubTask(stage.Task, op) } @@ -826,7 +838,7 @@ func (w *SourceWorker) operateSubTaskStage(stage ha.Stage, subTaskCfg config.Sub // operateSubTaskStageWithoutConfig returns TaskOp additionally to record metrics. func (w *SourceWorker) operateSubTaskStageWithoutConfig(stage ha.Stage) (string, error) { var subTaskCfg config.SubTaskConfig - if stage.Expect == pb.Stage_Running { + if stage.Expect == pb.Stage_Running || stage.Expect == pb.Stage_Stopped { if st := w.subTaskHolder.findSubTask(stage.Task); st == nil { tsm, _, err := ha.GetSubTaskCfg(w.etcdClient, stage.Source, stage.Task, stage.Revision) if err != nil { diff --git a/dm/dm/worker/source_worker_test.go b/dm/dm/worker/source_worker_test.go index a9ec7be9691..3c435149593 100644 --- a/dm/dm/worker/source_worker_test.go +++ b/dm/dm/worker/source_worker_test.go @@ -105,12 +105,19 @@ func (t *testServer) testWorker(c *C) { c.Assert(task, NotNil) c.Assert(task.Result().String(), Matches, ".*worker already closed.*") + c.Assert(w.StartSubTask(&config.SubTaskConfig{ + Name: "testStartTask-in-stopped", + }, pb.Stage_Stopped, pb.Stage_Stopped, true), IsNil) + task = w.subTaskHolder.findSubTask("testStartTask-in-stopped") + c.Assert(task, NotNil) + c.Assert(task.Result().String(), Matches, ".*worker already closed.*") + err = w.UpdateSubTask(context.Background(), &config.SubTaskConfig{ Name: "testStartTask", }) c.Assert(err, ErrorMatches, ".*worker already closed.*") - err = w.OperateSubTask("testSubTask", pb.TaskOp_Stop) + err = w.OperateSubTask("testSubTask", pb.TaskOp_Delete) c.Assert(err, ErrorMatches, ".*worker already closed.*") } diff --git a/dm/dm/worker/subtask.go b/dm/dm/worker/subtask.go index 772723ff3f9..ad4e26260b0 100644 --- a/dm/dm/worker/subtask.go +++ b/dm/dm/worker/subtask.go @@ -503,6 +503,20 @@ func (st *SubTask) setStageIfNotIn(oldStages []pb.Stage, newStage pb.Stage) bool return true } +// setStageIfNotIn sets stage to newStage if its current value is in oldStages. +func (st *SubTask) setStageIfIn(oldStages []pb.Stage, newStage pb.Stage) bool { + st.Lock() + defer st.Unlock() + for _, s := range oldStages { + if st.stage == s { + st.stage = newStage + updateTaskMetric(st.cfg.Name, st.cfg.SourceID, st.stage, st.workerName) + return true + } + } + return false +} + // Stage returns the stage of the sub task. func (st *SubTask) Stage() pb.Stage { st.RLock() @@ -588,7 +602,7 @@ func (st *SubTask) Resume(relay relay.Process) error { return nil } - if !st.stageCAS(pb.Stage_Paused, pb.Stage_Resuming) { + if !st.setStageIfIn([]pb.Stage{pb.Stage_Paused, pb.Stage_Stopped}, pb.Stage_Resuming) { return terror.ErrWorkerNotPausedStage.Generate(st.Stage().String()) } diff --git a/dm/tests/openapi/run.sh b/dm/tests/openapi/run.sh index 0286527b3bc..8432e635fb8 100644 --- a/dm/tests/openapi/run.sh +++ b/dm/tests/openapi/run.sh @@ -424,7 +424,6 @@ function test_noshard_task_dump_status() { check_port_offline $WORKER2_PORT 20 openapi_task_check "get_task_status_success_but_worker_meet_error" "$task_name" 2 - clean_cluster_sources_and_tasks export GO_FAILPOINTS="" kill_dm_worker @@ -437,6 +436,8 @@ function test_noshard_task_dump_status() { # run dm-worker2 run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + openapi_source_check "list_source_success" 2 + clean_cluster_sources_and_tasks echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: NO SHARD TASK DUMP STATUS SUCCESS" }