Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

master(dm): support create a task in stopped state #4510

Merged
merged 47 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
d631fdd
savework enable test
Ehco1996 Feb 7, 2022
3e1ccc4
add UT
Ehco1996 Feb 7, 2022
873b5e1
refine ut
Ehco1996 Feb 7, 2022
b8b7712
Merge branch 'master' into feature-#4484-new-task-state
Ehco1996 Feb 13, 2022
b4d1e66
add source manager and repalce api
Ehco1996 Feb 13, 2022
f37b4f9
fix disableSource
Ehco1996 Feb 13, 2022
30dbbbe
save work
Ehco1996 Feb 14, 2022
95ee828
merge to one file
Ehco1996 Feb 14, 2022
6e186b0
rename more
Ehco1996 Feb 14, 2022
169bfda
adjust worker
Ehco1996 Feb 14, 2022
3e6a933
Merge branch 'master' into feature-#4484-new-task-state
Ehco1996 Feb 14, 2022
34de49e
mute worker not started when crate source
Ehco1996 Feb 15, 2022
5907ad4
fix logging
Ehco1996 Feb 15, 2022
6c70372
fix an test
Ehco1996 Feb 15, 2022
721c785
fix lint
Ehco1996 Feb 15, 2022
b9e1784
refine comments
Ehco1996 Feb 15, 2022
81e77ee
change test file name
Ehco1996 Feb 15, 2022
a4a1581
Merge branch 'master' into feature-#4484-new-task-state
Ehco1996 Feb 15, 2022
88d66a6
add ut
Ehco1996 Feb 15, 2022
6449448
Merge branch 'master' into feature-#4484-new-task-state
Ehco1996 Feb 23, 2022
dd2e551
fix lint
Ehco1996 Feb 23, 2022
b1e781b
fix comment typo
Ehco1996 Feb 23, 2022
a9fd699
fix a test
Ehco1996 Feb 23, 2022
87a99e4
Merge branch 'master' into feature-#4484-new-task-state
Ehco1996 Feb 23, 2022
c602ee9
merge AddSourceCfg
Ehco1996 Feb 28, 2022
f19fc70
add comment for TransferSource
Ehco1996 Feb 28, 2022
14efe7d
fix get Status From worker
Ehco1996 Feb 28, 2022
029f623
add todo
Ehco1996 Feb 28, 2022
d43b44b
use require.True
Ehco1996 Feb 28, 2022
9629cef
fix wrong mock in UT
Ehco1996 Feb 28, 2022
38d4dda
Revert "merge AddSourceCfg"
Ehco1996 Feb 28, 2022
b8d5735
address comment
Ehco1996 Feb 28, 2022
a5e7631
Merge branch 'master' into feature-#4484-new-task-state
Ehco1996 Feb 28, 2022
d7b4e7d
address comment
Ehco1996 Mar 1, 2022
b138816
address comment
Ehco1996 Mar 1, 2022
bf6bf7a
fix bootstrap comment
Ehco1996 Mar 1, 2022
a6080c9
add status len check
Ehco1996 Mar 1, 2022
256ef41
use stage null when op_delete for better reading
Ehco1996 Mar 2, 2022
f0e2406
Merge branch 'master' into feature-#4484-new-task-state
Ehco1996 Mar 2, 2022
fee001c
use op to check result
Ehco1996 Mar 2, 2022
c608c0b
address comment
Ehco1996 Mar 2, 2022
74ea013
Update dm/dm/worker/source_worker.go
Ehco1996 Mar 3, 2022
8955c53
address comment
Ehco1996 Mar 3, 2022
78f098c
left todo
Ehco1996 Mar 3, 2022
5594251
Merge branch 'master' into feature-#4484-new-task-state
Ehco1996 Mar 3, 2022
7b21bf7
fix conficts after merge master
Ehco1996 Mar 3, 2022
5719c90
Merge branch 'master' into feature-#4484-new-task-state
ti-chi-bot Mar 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dm/chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (t *task) run() error {
func (t *task) stopPreviousTask() error {
t.logger.Info("stopping previous task")
resp, err := t.cli.OperateTask(t.ctx, &pb.OperateTaskRequest{
Op: pb.TaskOp_Stop,
Op: pb.TaskOp_Delete,
Name: t.taskCfg.Name,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/ctl/master/stop_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ func NewStopTaskCmd() *cobra.Command {

// stopTaskFunc does stop task request.
func stopTaskFunc(cmd *cobra.Command, _ []string) (err error) {
return operateTaskFunc(pb.TaskOp_Stop, cmd)
return operateTaskFunc(pb.TaskOp_Delete, cmd)
}
15 changes: 1 addition & 14 deletions dm/dm/master/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,6 @@ func (s *Server) upgradeDBSchemaV1Import(tctx *tcontext.Context, cfgs map[string
}

// createSubtaskV1Import tries to create subtasks with the specified stage.
// NOTE: now we only have two different APIs to:
// - create a new (running) subtask.
// - update the subtask to the specified stage.
// in other words, if we want to create a `Paused` task,
// we need to create a `Running` one first and then `Pause` it.
// this is not a big problem now, but if needed we can refine it later.
// NOTE: we do not stopping previous subtasks if any later one failed (because some side effects may have taken),
// and let the user to check & fix the problem.
// TODO(csuzhangxc): merge subtask configs to support `get-task-config`.
Expand All @@ -377,8 +371,7 @@ outerLoop:
tctx.Logger.Warn("skip to create subtask because only support to create subtasks with Running/Paused stage now", zap.Stringer("stage", stage))
continue
}
// create and update subtasks one by one (this should be quick enough because only updating etcd).
err = s.scheduler.AddSubTasks(false, *cfg2)
err = s.scheduler.AddSubTasks(false, stage, *cfg2)
if err != nil {
if terror.ErrSchedulerSubTaskExist.Equal(err) {
err = nil // reset error
Expand All @@ -387,12 +380,6 @@ outerLoop:
break outerLoop
}
}
if stage == pb.Stage_Paused { // no more operation needed for `Running`.
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
err = s.scheduler.UpdateExpectSubTaskStage(stage, taskName, sourceID)
if err != nil {
break outerLoop
}
}
}
}
return err
Expand Down
284 changes: 284 additions & 0 deletions dm/dm/master/openapi_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
//
// MVC for dm-master's openapi server
// Model(data in etcd): source of truth
// View(openapi_view): do some inner work such as validate, filter, prepare parameters/response and call controller to update model.
// Controller(openapi_controller): call model func to update data.

package master

import (
"context"

"github.com/pingcap/tiflow/dm/checker"
"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/dm/pb"
"github.com/pingcap/tiflow/dm/openapi"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

func (s *Server) getSourceStatusListFromWorker(ctx context.Context, sourceName string, specifiedSource bool) ([]openapi.SourceStatus, error) {
workerStatusList := s.getStatusFromWorkers(ctx, []string{sourceName}, "", specifiedSource)
sourceStatusList := make([]openapi.SourceStatus, len(workerStatusList))
for i, workerStatus := range workerStatusList {
if workerStatus == nil {
// this should not happen unless the rpc in the worker server has been modified
return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil")
}
sourceStatus := openapi.SourceStatus{SourceName: sourceName, WorkerName: workerStatus.SourceStatus.Worker}
if !workerStatus.Result {
sourceStatus.ErrorMsg = &workerStatus.Msg
} else if relayStatus := workerStatus.SourceStatus.GetRelayStatus(); relayStatus != nil {
sourceStatus.RelayStatus = &openapi.RelayStatus{
MasterBinlog: relayStatus.MasterBinlog,
MasterBinlogGtid: relayStatus.MasterBinlogGtid,
RelayBinlogGtid: relayStatus.RelayBinlogGtid,
RelayCatchUpMaster: relayStatus.RelayCatchUpMaster,
RelayDir: relayStatus.RelaySubDir,
Stage: relayStatus.Stage.String(),
}
}
sourceStatusList[i] = sourceStatus
}
return sourceStatusList, nil
}

// nolint:unparam
func (s *Server) createSource(ctx context.Context, cfg *config.SourceConfig) error {
Copy link
Contributor Author

@Ehco1996 Ehco1996 Feb 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seprate test for this file would be added in next pr, current all func is tested Indirectly over openapi_view_test.go

return s.scheduler.AddSourceCfg(cfg)
}

// nolint:unparam,unused
func (s *Server) updateSource(ctx context.Context, cfg *config.SourceConfig) error {
// TODO(ehco) no caller now , will implement later
return nil
}

// nolint:unparam
func (s *Server) deleteSource(ctx context.Context, sourceName string, force bool) error {
if force {
for _, taskName := range s.scheduler.GetTaskNameListBySourceName(sourceName) {
if err := s.scheduler.RemoveSubTasks(taskName, sourceName); err != nil {
return err
}
}
}
return s.scheduler.RemoveSourceCfg(sourceName)
}

// nolint:unparam,unused
func (s *Server) getSource(ctx context.Context, sourceName string) (openapiSource openapi.Source, err error) {
sourceCfg := s.scheduler.GetSourceCfgByID(sourceName)
if sourceCfg == nil {
return openapiSource, terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName)
}
openapiSource = config.SourceCfgToOpenAPISource(sourceCfg)
return openapiSource, nil
}

func (s *Server) getSourceStatus(ctx context.Context, sourceName string) ([]openapi.SourceStatus, error) {
return s.getSourceStatusListFromWorker(ctx, sourceName, true)
}

// nolint:unparam
func (s *Server) listSource(ctx context.Context, req interface{}) []openapi.Source {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all usage of funcs in this file is not steable, parameters and return response will be enriched in the next pr

// TODO(ehco) implement filter later
sourceM := s.scheduler.GetSourceCfgs()
openapiSourceList := make([]openapi.Source, 0, len(sourceM))
for _, source := range sourceM {
openapiSourceList = append(openapiSourceList, config.SourceCfgToOpenAPISource(source))
}
return openapiSourceList
}

// nolint:unparam,unused
func (s *Server) enableSource(ctx context.Context, sourceName, workerName string) error {
worker := s.scheduler.GetWorkerBySource(sourceName)
if worker == nil {
return terror.ErrWorkerNoStart
}
taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName)
return s.scheduler.BatchOperateTaskOnWorker(ctx, worker, taskNameList, sourceName, pb.Stage_Running, true)
}

// nolint:unused
func (s *Server) disableSource(ctx context.Context, sourceName string) error {
worker := s.scheduler.GetWorkerBySource(sourceName)
if worker == nil {
// no need to stop task if the source is not running
return nil
}
taskNameList := s.scheduler.GetTaskNameListBySourceName(sourceName)
return s.scheduler.BatchOperateTaskOnWorker(ctx, worker, taskNameList, sourceName, pb.Stage_Stopped, true)
}

func (s *Server) transferSource(ctx context.Context, sourceName, workerName string) error {
return s.scheduler.TransferSource(ctx, sourceName, workerName)
}

func (s *Server) checkTask(ctx context.Context, subtaskCfgList []*config.SubTaskConfig, errCnt, warnCnt int64) (string, error) {
return checker.CheckSyncConfigFunc(ctx, subtaskCfgList, errCnt, warnCnt)
}

// nolint:unparam,unused
func (s *Server) createTask(ctx context.Context, subtaskCfgList []*config.SubTaskConfig) error {
return s.scheduler.AddSubTasks(false, pb.Stage_Stopped, subtaskCfgPointersToInstances(subtaskCfgList...)...)
}

// nolint:unused
func (s *Server) updateTask(ctx context.Context, taskCfg *config.TaskConfig) error {
// TODO(ehco) no caller now , will implement later
return nil
}

// nolint:unparam,unused
func (s *Server) deleteTask(ctx context.Context, taskName string) error {
sourceNameList := s.getTaskSourceNameList(taskName)
return s.scheduler.RemoveSubTasks(taskName, sourceNameList...)
}

// nolint:unused
func (s *Server) getTask(ctx context.Context, taskName string) error {
// TODO(ehco) no caller now , will implement later
return nil
}

func (s *Server) getTaskStatus(ctx context.Context, taskName string, sourceNameList []string) ([]openapi.SubTaskStatus, error) {
workerStatusList := s.getStatusFromWorkers(ctx, sourceNameList, taskName, true)
subTaskStatusList := make([]openapi.SubTaskStatus, 0, len(workerStatusList))
for _, workerStatus := range workerStatusList {
if workerStatus == nil || workerStatus.SourceStatus == nil {
// this should not happen unless the rpc in the worker server has been modified
return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil")
}
sourceStatus := workerStatus.SourceStatus
openapiSubTaskStatus := openapi.SubTaskStatus{
Name: taskName,
SourceName: sourceStatus.GetSource(),
WorkerName: sourceStatus.GetWorker(),
}
if !workerStatus.Result {
openapiSubTaskStatus.ErrorMsg = &workerStatus.Msg
subTaskStatusList = append(subTaskStatusList, openapiSubTaskStatus)
continue
}
if len(workerStatus.SubTaskStatus) == 0 {
// this should not happen unless the rpc in the worker server has been modified
return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil")
}
subTaskStatus := workerStatus.SubTaskStatus[0]
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
if subTaskStatus == nil {
// this should not happen unless the rpc in the worker server has been modified
return nil, terror.ErrOpenAPICommonError.New("worker's query-status response is nil")
}
openapiSubTaskStatus.Stage = subTaskStatus.GetStage().String()
openapiSubTaskStatus.Unit = subTaskStatus.GetUnit().String()
openapiSubTaskStatus.UnresolvedDdlLockId = &subTaskStatus.UnresolvedDDLLockID
// add load status
if loadS := subTaskStatus.GetLoad(); loadS != nil {
openapiSubTaskStatus.LoadStatus = &openapi.LoadStatus{
FinishedBytes: loadS.FinishedBytes,
MetaBinlog: loadS.MetaBinlog,
MetaBinlogGtid: loadS.MetaBinlogGTID,
Progress: loadS.Progress,
TotalBytes: loadS.TotalBytes,
}
}
// add sync status
if syncerS := subTaskStatus.GetSync(); syncerS != nil {
openapiSubTaskStatus.SyncStatus = &openapi.SyncStatus{
BinlogType: syncerS.GetBinlogType(),
BlockingDdls: syncerS.GetBlockingDDLs(),
MasterBinlog: syncerS.GetMasterBinlog(),
MasterBinlogGtid: syncerS.GetMasterBinlogGtid(),
RecentTps: syncerS.RecentTps,
SecondsBehindMaster: syncerS.SecondsBehindMaster,
Synced: syncerS.Synced,
SyncerBinlog: syncerS.SyncerBinlog,
SyncerBinlogGtid: syncerS.SyncerBinlogGtid,
TotalEvents: syncerS.TotalEvents,
TotalTps: syncerS.TotalTps,
}
if unResolvedGroups := syncerS.GetUnresolvedGroups(); len(unResolvedGroups) > 0 {
openapiSubTaskStatus.SyncStatus.UnresolvedGroups = make([]openapi.ShardingGroup, len(unResolvedGroups))
for i, unResolvedGroup := range unResolvedGroups {
openapiSubTaskStatus.SyncStatus.UnresolvedGroups[i] = openapi.ShardingGroup{
DdlList: unResolvedGroup.DDLs,
FirstLocation: unResolvedGroup.FirstLocation,
Synced: unResolvedGroup.Synced,
Target: unResolvedGroup.Target,
Unsynced: unResolvedGroup.Unsynced,
}
}
}
}
// add dump status
if dumpS := subTaskStatus.GetDump(); dumpS != nil {
openapiSubTaskStatus.DumpStatus = &openapi.DumpStatus{
CompletedTables: dumpS.CompletedTables,
EstimateTotalRows: dumpS.EstimateTotalRows,
FinishedBytes: dumpS.FinishedBytes,
FinishedRows: dumpS.FinishedRows,
TotalTables: dumpS.TotalTables,
}
}
subTaskStatusList = append(subTaskStatusList, openapiSubTaskStatus)
}
return subTaskStatusList, nil
}

// nolint:unparam,unused
func (s *Server) listTask(ctx context.Context, req interface{}) []openapi.Task {
// TODO(ehco) implement filter later
subTaskConfigMap := s.scheduler.GetSubTaskCfgs()
return config.SubTaskConfigsToOpenAPITask(subTaskConfigMap)
}

// nolint:unparam,unused
func (s *Server) startTask(ctx context.Context, taskName string, sourceNameList []string, removeMeta bool, req interface{}) error {
// TODO(ehco) merge start-task req
subTaskConfigM := s.scheduler.GetSubTaskCfgsByTask(taskName)
needStartSubTaskList := make([]*config.SubTaskConfig, 0, len(subTaskConfigM))
for _, sourceName := range sourceNameList {
subTaskCfg, ok := subTaskConfigM[sourceName]
if !ok {
return terror.ErrSchedulerSourceCfgNotExist.Generate(sourceName)
}
needStartSubTaskList = append(needStartSubTaskList, subTaskCfg)
}
if len(needStartSubTaskList) == 0 {
return nil
}
if removeMeta {
// use same latch for remove-meta and start-task
release, err := s.scheduler.AcquireSubtaskLatch(taskName)
if err != nil {
return terror.ErrSchedulerLatchInUse.Generate("RemoveMeta", taskName)
}
defer release()
metaSchema := needStartSubTaskList[0].MetaSchema
targetDB := needStartSubTaskList[0].To
err = s.removeMetaData(ctx, taskName, metaSchema, &targetDB)
if err != nil {
return terror.Annotate(err, "while removing metadata")
}
release()
}
return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Running, taskName, sourceNameList...)
}

// nolint:unparam,unused
func (s *Server) stopTask(ctx context.Context, taskName string, sourceNameList []string) error {
return s.scheduler.UpdateExpectSubTaskStage(pb.Stage_Stopped, taskName, sourceNameList...)
}
Loading