Skip to content

Commit

Permalink
record error when fast fail
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Sep 14, 2022
1 parent 4e9fbe9 commit 0c9804b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
10 changes: 7 additions & 3 deletions engine/servermaster/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type JobManager interface {

GetJobMasterForwardAddress(ctx context.Context, jobID string) (string, error)
GetJobStatuses(ctx context.Context) (map[frameModel.MasterID]frameModel.MasterState, error)
UpdateJobStatus(ctx context.Context, jobID frameModel.MasterID, code frameModel.MasterState) error
UpdateJobStatus(ctx context.Context, jobID frameModel.MasterID, errMsg string, code frameModel.MasterState) error
WatchJobStatuses(
ctx context.Context,
) (resManager.JobStatusesSnapshot, *notifier.Receiver[resManager.JobStatusChangeEvent], error)
Expand Down Expand Up @@ -501,13 +501,14 @@ func (jm *JobManagerImpl) GetJobStatuses(

// UpdateJobStatus implements JobManager.UpdateJobStatus
func (jm *JobManagerImpl) UpdateJobStatus(
ctx context.Context, jobID frameModel.MasterID, code frameModel.MasterState,
ctx context.Context, jobID frameModel.MasterID, errMsg string, code frameModel.MasterState,
) error {
// Note since the job is not online, it is safe to get from metastore and then update
meta, err := jm.frameMetaClient.GetJobByID(ctx, jobID)
if err != nil {
return err
}
meta.ErrorMsg = errMsg
meta.State = code
return jm.frameMetaClient.UpsertJob(ctx, meta)
}
Expand Down Expand Up @@ -682,7 +683,10 @@ func (jm *JobManagerImpl) OnWorkerDispatched(worker framework.WorkerHandle, resu
log.Info("job master terminated", zap.String("job-id", worker.ID()))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
if err := jm.UpdateJobStatus(ctx, worker.ID(), frameModel.MasterStateFailed); err != nil {
errIn, _ := pkgClient.ErrCreateWorkerTerminate.Convert(result)
if err := jm.UpdateJobStatus(
ctx, worker.ID(), errIn.Details, frameModel.MasterStateFailed,
); err != nil {
return err
}
jm.JobFsm.JobOffline(worker, false /* needFailover */)
Expand Down
4 changes: 3 additions & 1 deletion engine/servermaster/jobmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,10 +650,11 @@ func TestOnWorkerDispatchedFastFail(t *testing.T) {

// simulate a job is created.
mgr.JobFsm.JobDispatched(mockMaster.MasterMeta(), false)
errorMsg := "unit test fast fail error"
mockHandle := &framework.MockHandle{WorkerID: masterID}
nerr := pkgClient.ErrCreateWorkerTerminate.Gen(
&pkgClient.CreateWorkerTerminateError{
Details: "unit test fast fail error",
Details: errorMsg,
})
// OnWorkerDispatched callback on job manager, a terminated error will make
// job fast fail.
Expand All @@ -663,4 +664,5 @@ func TestOnWorkerDispatchedFastFail(t *testing.T) {
mockMaster.MasterMeta().ProjectID, int(frameModel.MasterStateFailed))
require.NoError(t, err)
require.Len(t, meta, 1)
require.Equal(t, errorMsg, meta[0].ErrorMsg)
}

0 comments on commit 0c9804b

Please sign in to comment.