Skip to content

Commit

Permalink
dm(engine): declarative command should consider time (#7640)
Browse files Browse the repository at this point in the history
ref #4287
  • Loading branch information
lance6716 authored Nov 21, 2022
1 parent e15ac93 commit 3d22bf4
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 57 deletions.
12 changes: 9 additions & 3 deletions engine/executor/dm/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,15 @@ func (w *dmWorker) tryUpdateStatus(ctx context.Context) error {
// workerStatus gets worker status.
func (w *dmWorker) workerStatus(ctx context.Context) frameModel.WorkerStatus {
var (
stage = w.getStage()
code frameModel.WorkerState
taskStatus = &runtime.TaskStatus{Unit: w.workerType, Task: w.taskID, Stage: stage, CfgModRevision: w.cfgModRevision}
stage = w.getStage()
code frameModel.WorkerState
taskStatus = &runtime.TaskStatus{
Unit: w.workerType,
Task: w.taskID,
Stage: stage,
StageUpdatedTime: time.Now(),
CfgModRevision: w.cfgModRevision,
}
finalStatus any
)
if stage == metadata.StageFinished {
Expand Down
40 changes: 24 additions & 16 deletions engine/jobmaster/dm/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,20 +131,22 @@ func TestQueryStatusAPI(t *testing.T) {
"task2": {
&metadata.FinishedTaskStatus{
TaskStatus: metadata.TaskStatus{
Unit: frameModel.WorkerDMDump,
Task: "task2",
Stage: metadata.StageFinished,
CfgModRevision: 3,
Unit: frameModel.WorkerDMDump,
Task: "task2",
Stage: metadata.StageFinished,
CfgModRevision: 3,
StageUpdatedTime: loadTime,
},
Status: dumpStatusBytes,
Duration: dumpDuration,
},
&metadata.FinishedTaskStatus{
TaskStatus: metadata.TaskStatus{
Unit: frameModel.WorkerDMLoad,
Task: "task2",
Stage: metadata.StageFinished,
CfgModRevision: 3,
Unit: frameModel.WorkerDMLoad,
Task: "task2",
Stage: metadata.StageFinished,
CfgModRevision: 3,
StageUpdatedTime: syncTime,
},
Status: loadStatusBytes,
Duration: loadDuration,
Expand All @@ -153,20 +155,22 @@ func TestQueryStatusAPI(t *testing.T) {
"task7": {
&metadata.FinishedTaskStatus{
TaskStatus: metadata.TaskStatus{
Unit: frameModel.WorkerDMDump,
Task: "task7",
Stage: metadata.StageFinished,
CfgModRevision: 4,
Unit: frameModel.WorkerDMDump,
Task: "task7",
Stage: metadata.StageFinished,
CfgModRevision: 4,
StageUpdatedTime: loadTime,
},
Status: dumpStatusBytes,
Duration: dumpDuration,
},
&metadata.FinishedTaskStatus{
TaskStatus: metadata.TaskStatus{
Unit: frameModel.WorkerDMLoad,
Task: "task7",
Stage: metadata.StageFinished,
CfgModRevision: 4,
Unit: frameModel.WorkerDMLoad,
Task: "task7",
Stage: metadata.StageFinished,
CfgModRevision: 4,
StageUpdatedTime: syncTime,
},
Status: loadStatusBytes,
Duration: loadDuration,
Expand Down Expand Up @@ -369,6 +373,7 @@ func TestQueryStatusAPI(t *testing.T) {
"Task": "task2",
"Stage": "Finished",
"CfgModRevision": 3,
"StageUpdatedTime": "2022-11-04T19:47:57.43382274+08:00",
"Result": null,
"Status": {
"totalTables": 10,
Expand All @@ -386,6 +391,7 @@ func TestQueryStatusAPI(t *testing.T) {
"Task": "task2",
"Stage": "Finished",
"CfgModRevision": 3,
"StageUpdatedTime": "2022-11-04T20:47:57.43382274+08:00",
"Result": null,
"Status": {
"finishedBytes": 4,
Expand All @@ -404,6 +410,7 @@ func TestQueryStatusAPI(t *testing.T) {
"Task": "task7",
"Stage": "Finished",
"CfgModRevision": 4,
"StageUpdatedTime": "2022-11-04T19:47:57.43382274+08:00",
"Result": null,
"Status": {
"totalTables": 10,
Expand All @@ -421,6 +428,7 @@ func TestQueryStatusAPI(t *testing.T) {
"Task": "task7",
"Stage": "Finished",
"CfgModRevision": 4,
"StageUpdatedTime": "2022-11-04T20:47:57.43382274+08:00",
"Result": null,
"Status": {
"finishedBytes": 4,
Expand Down
11 changes: 9 additions & 2 deletions engine/jobmaster/dm/dm_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ func (j dmJobMasterFactory) DeserializeConfig(configBytes []byte) (registry.Work
}

// NewWorkerImpl implements WorkerFactory.NewWorkerImpl
func (j dmJobMasterFactory) NewWorkerImpl(dCtx *dcontext.Context, workerID frameModel.WorkerID, masterID frameModel.MasterID, conf framework.WorkerConfig) (framework.WorkerImpl, error) {
func (j dmJobMasterFactory) NewWorkerImpl(
dCtx *dcontext.Context,
workerID frameModel.WorkerID,
masterID frameModel.MasterID,
conf framework.WorkerConfig,
) (framework.WorkerImpl, error) {
log.L().Info("new dm jobmaster", zap.String(logutil.ConstFieldJobKey, workerID))
jm := &JobMaster{
initJobCfg: conf.(*config.JobCfg),
Expand Down Expand Up @@ -183,6 +188,7 @@ func (jm *JobMaster) OnWorkerOnline(worker framework.WorkerHandle) error {
return jm.handleOnlineStatus(worker)
}

// handleOnlineStatus is used by OnWorkerOnline and OnWorkerStatusUpdated.
func (jm *JobMaster) handleOnlineStatus(worker framework.WorkerHandle) error {
var taskStatus runtime.TaskStatus
if err := json.Unmarshal(worker.Status().ExtBytes, &taskStatus); err != nil {
Expand Down Expand Up @@ -225,7 +231,8 @@ func (jm *JobMaster) onWorkerFinished(finishedTaskStatus runtime.FinishedTaskSta

unitStateStore := jm.metadata.UnitStateStore()
err := unitStateStore.ReadModifyWrite(context.TODO(), func(state *metadata.UnitState) error {
finishedTaskStatus.Duration = time.Since(state.CurrentUnitStatus[taskStatus.Task].CreatedTime)
finishedTaskStatus.StageUpdatedTime = time.Now()
finishedTaskStatus.Duration = finishedTaskStatus.StageUpdatedTime.Sub(state.CurrentUnitStatus[taskStatus.Task].CreatedTime)
for i, status := range state.FinishedUnitStatus[taskStatus.Task] {
// when the unit is restarted by update-cfg or something, overwrite the old status and truncate
if status.Unit == taskStatus.Unit {
Expand Down
17 changes: 11 additions & 6 deletions engine/jobmaster/dm/metadata/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"sync"
"time"

"github.com/pingcap/tiflow/engine/jobmaster/dm/bootstrap"
"github.com/pingcap/tiflow/engine/jobmaster/dm/config"
Expand Down Expand Up @@ -118,15 +119,17 @@ func NewJob(jobCfg *config.JobCfg) *Job {
// Task is the minimum working unit of a job.
// A job may contain multiple upstream and it will be converted into multiple tasks.
type Task struct {
Cfg *config.TaskCfg
Stage TaskStage
Cfg *config.TaskCfg
Stage TaskStage
StageUpdatedTime time.Time
}

// NewTask creates a new Task instance
func NewTask(taskCfg *config.TaskCfg) *Task {
return &Task{
Cfg: taskCfg,
Stage: StageRunning, // TODO: support set stage when create task.
Cfg: taskCfg,
Stage: StageRunning, // TODO: support set stage when create task.
StageUpdatedTime: time.Now(),
}
}

Expand Down Expand Up @@ -181,11 +184,12 @@ func (jobStore *JobStore) UpdateStages(ctx context.Context, taskIDs []string, st
}
}
for _, taskID := range taskIDs {
if _, ok := job.Tasks[taskID]; !ok {
t, ok := job.Tasks[taskID]
if !ok {
return errors.Errorf("task %s not found", taskID)
}
t := job.Tasks[taskID]
t.Stage = stage
t.StageUpdatedTime = time.Now()
}

return jobStore.Put(ctx, job)
Expand Down Expand Up @@ -218,6 +222,7 @@ func (jobStore *JobStore) UpdateConfig(ctx context.Context, jobCfg *config.JobCf
// task stage will not be updated.
if oldTask, ok := oldJob.Tasks[taskID]; ok {
newTask.Stage = oldTask.Stage
newTask.StageUpdatedTime = oldTask.StageUpdatedTime
}
}

Expand Down
9 changes: 5 additions & 4 deletions engine/jobmaster/dm/metadata/unit_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func NewUnitStateStore(kvClient metaModel.KVClient) *UnitStateStore {

// TaskStatus defines the running task status.
type TaskStatus struct {
Unit frameModel.WorkerType
Task string
Stage TaskStage
CfgModRevision uint64
Unit frameModel.WorkerType
Task string
Stage TaskStage
CfgModRevision uint64
StageUpdatedTime time.Time
}

// FinishedTaskStatus wraps the TaskStatus with FinishedStatus.
Expand Down
22 changes: 17 additions & 5 deletions engine/jobmaster/dm/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (tm *TaskManager) checkAndOperateTasks(ctx context.Context, job *metadata.J
continue
}

op := genOp(runningTask.Stage, persistentTask.Stage)
op := genOp(runningTask.Stage, runningTask.StageUpdatedTime, persistentTask.Stage, persistentTask.StageUpdatedTime)
if op == dmpkg.None {
tm.logger.Debug(
"task status will not be changed",
Expand Down Expand Up @@ -224,12 +224,24 @@ func (tm *TaskManager) GetTaskStatus(taskID string) (runtime.TaskStatus, bool) {
return value.(runtime.TaskStatus), true
}

func genOp(runtimeStage, expectedStage metadata.TaskStage) dmpkg.OperateType {
func genOp(
runningStage metadata.TaskStage,
runningStageUpdatedTime time.Time,
expectedStage metadata.TaskStage,
expectedStageUpdatedTime time.Time,
) dmpkg.OperateType {
switch {
case expectedStage == metadata.StagePaused && (runtimeStage == metadata.StageRunning || runtimeStage == metadata.StageError):
case expectedStage == metadata.StagePaused && (runningStage == metadata.StageRunning || runningStage == metadata.StageError):
return dmpkg.Pause
case expectedStage == metadata.StageRunning && runtimeStage == metadata.StagePaused:
return dmpkg.Resume
case expectedStage == metadata.StageRunning:
if runningStage == metadata.StagePaused {
return dmpkg.Resume
}
// only resume a error task for a manual Resume action by checking expectedStageUpdatedTime
if runningStage == metadata.StageError && expectedStageUpdatedTime.After(runningStageUpdatedTime) {
return dmpkg.Resume
}
return dmpkg.None
// TODO: support update
default:
return dmpkg.None
Expand Down
Loading

0 comments on commit 3d22bf4

Please sign in to comment.