From 0c9804b2c65f392e0123bbb8a2c9c7c6e00db72b Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 14 Sep 2022 17:21:44 +0800 Subject: [PATCH] record error when fast fail --- engine/servermaster/jobmanager.go | 10 +++++++--- engine/servermaster/jobmanager_test.go | 4 +++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/engine/servermaster/jobmanager.go b/engine/servermaster/jobmanager.go index 73d20e166b5..97467ab8be6 100644 --- a/engine/servermaster/jobmanager.go +++ b/engine/servermaster/jobmanager.go @@ -62,7 +62,7 @@ type JobManager interface { GetJobMasterForwardAddress(ctx context.Context, jobID string) (string, error) GetJobStatuses(ctx context.Context) (map[frameModel.MasterID]frameModel.MasterState, error) - UpdateJobStatus(ctx context.Context, jobID frameModel.MasterID, code frameModel.MasterState) error + UpdateJobStatus(ctx context.Context, jobID frameModel.MasterID, errMsg string, code frameModel.MasterState) error WatchJobStatuses( ctx context.Context, ) (resManager.JobStatusesSnapshot, *notifier.Receiver[resManager.JobStatusChangeEvent], error) @@ -501,13 +501,14 @@ func (jm *JobManagerImpl) GetJobStatuses( // UpdateJobStatus implements JobManager.UpdateJobStatus func (jm *JobManagerImpl) UpdateJobStatus( - ctx context.Context, jobID frameModel.MasterID, code frameModel.MasterState, + ctx context.Context, jobID frameModel.MasterID, errMsg string, code frameModel.MasterState, ) error { // Note since the job is not online, it is safe to get from metastore and then update meta, err := jm.frameMetaClient.GetJobByID(ctx, jobID) if err != nil { return err } + meta.ErrorMsg = errMsg meta.State = code return jm.frameMetaClient.UpsertJob(ctx, meta) } @@ -682,7 +683,10 @@ func (jm *JobManagerImpl) OnWorkerDispatched(worker framework.WorkerHandle, resu log.Info("job master terminated", zap.String("job-id", worker.ID())) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - if err := jm.UpdateJobStatus(ctx, worker.ID(), frameModel.MasterStateFailed); err != nil { + errIn, _ := pkgClient.ErrCreateWorkerTerminate.Convert(result) + if err := jm.UpdateJobStatus( + ctx, worker.ID(), errIn.Details, frameModel.MasterStateFailed, + ); err != nil { return err } jm.JobFsm.JobOffline(worker, false /* needFailover */) diff --git a/engine/servermaster/jobmanager_test.go b/engine/servermaster/jobmanager_test.go index a6691bab061..fd059b30af2 100644 --- a/engine/servermaster/jobmanager_test.go +++ b/engine/servermaster/jobmanager_test.go @@ -650,10 +650,11 @@ func TestOnWorkerDispatchedFastFail(t *testing.T) { // simulate a job is created. mgr.JobFsm.JobDispatched(mockMaster.MasterMeta(), false) + errorMsg := "unit test fast fail error" mockHandle := &framework.MockHandle{WorkerID: masterID} nerr := pkgClient.ErrCreateWorkerTerminate.Gen( &pkgClient.CreateWorkerTerminateError{ - Details: "unit test fast fail error", + Details: errorMsg, }) // OnWorkerDispatched callback on job manager, a terminated error will make // job fast fail. @@ -663,4 +664,5 @@ func TestOnWorkerDispatchedFastFail(t *testing.T) { mockMaster.MasterMeta().ProjectID, int(frameModel.MasterStateFailed)) require.NoError(t, err) require.Len(t, meta, 1) + require.Equal(t, errorMsg, meta[0].ErrorMsg) }