Skip to content

Commit

Permalink
openapi(engine): support job detail in GetJob and ListJob
Browse files Browse the repository at this point in the history
  • Loading branch information
maxshuang committed Sep 5, 2022
1 parent 88a03f0 commit 9398176
Show file tree
Hide file tree
Showing 33 changed files with 818 additions and 331 deletions.
2 changes: 1 addition & 1 deletion engine/enginepb/error.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

384 changes: 197 additions & 187 deletions engine/enginepb/master.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions engine/enginepb/master_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions engine/framework/base_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type BaseJobMaster interface {

// Exit should be called when jobmaster (in user logic) wants to exit.
// exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed
Exit(ctx context.Context, exitReason ExitReason, err error, extMsg string) error
Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error

// IsMasterReady returns whether the master has received heartbeats for all
// workers after a fail-over. If this is the first time the JobMaster started up,
Expand Down Expand Up @@ -349,16 +349,16 @@ func (d *DefaultBaseJobMaster) IsMasterReady() bool {
}

// Exit implements BaseJobMaster.Exit
func (d *DefaultBaseJobMaster) Exit(ctx context.Context, exitReason ExitReason, err error, extMsg string) error {
func (d *DefaultBaseJobMaster) Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error {
ctx, cancel := d.errCenter.WithCancelOnFirstError(ctx)
defer cancel()

// Don't set error center for master to make worker.Exit work well
if errTmp := d.master.exitWithoutSetErrCenter(ctx, exitReason, err, extMsg); errTmp != nil {
if errTmp := d.master.exitWithoutSetErrCenter(ctx, exitReason, err, detail); errTmp != nil {
return errTmp
}

return d.worker.Exit(ctx, exitReason, err, []byte(extMsg))
return d.worker.Exit(ctx, exitReason, err, detail)
}

// TriggerOpenAPIInitialize implements BaseJobMasterExt.TriggerOpenAPIInitialize.
Expand Down
35 changes: 17 additions & 18 deletions engine/framework/base_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestBaseJobMasterBasics(t *testing.T) {
jobMaster.mu.Unlock()

status := jobMaster.Status()
err = jobMaster.base.Exit(ctx, ExitReasonFinished, nil, string(status.ExtBytes))
err = jobMaster.base.Exit(ctx, ExitReasonFinished, nil, status.ExtBytes)
require.NoError(t, err)

err = jobMaster.base.Close(ctx)
Expand Down Expand Up @@ -283,58 +283,58 @@ func TestJobMasterExit(t *testing.T) {
cases := []struct {
exitReason ExitReason
err error
extMsg string
detail string
expectedState frameModel.MasterState
expectedErrorMsg string
expectedExtMsg string
expectedDetail string
}{
{
exitReason: ExitReasonFinished,
err: nil,
extMsg: "test finished",
detail: "test finished",
expectedState: frameModel.MasterStateFinished,
expectedErrorMsg: "",
expectedExtMsg: "test finished",
expectedDetail: "test finished",
},
{
exitReason: ExitReasonFinished,
err: errors.New("test finished with error"),
extMsg: "test finished",
detail: "test finished",
expectedState: frameModel.MasterStateFinished,
expectedErrorMsg: "test finished with error",
expectedExtMsg: "test finished",
expectedDetail: "test finished",
},
{
exitReason: ExitReasonCanceled,
err: nil,
extMsg: "test canceled",
detail: "test canceled",
expectedState: frameModel.MasterStateStopped,
expectedErrorMsg: "",
expectedExtMsg: "test canceled",
expectedDetail: "test canceled",
},
{
exitReason: ExitReasonCanceled,
err: errors.New("test canceled with error"),
extMsg: "test canceled",
detail: "test canceled",
expectedState: frameModel.MasterStateStopped,
expectedErrorMsg: "test canceled with error",
expectedExtMsg: "test canceled",
expectedDetail: "test canceled",
},
{
exitReason: ExitReasonFailed,
err: nil,
extMsg: "test failed",
detail: "test failed",
expectedState: frameModel.MasterStateFailed,
expectedErrorMsg: "",
expectedExtMsg: "test failed",
expectedDetail: "test failed",
},
{
exitReason: ExitReasonFailed,
err: errors.New("test failed with error"),
extMsg: "test failed",
detail: "test failed",
expectedState: frameModel.MasterStateFailed,
expectedErrorMsg: "test failed with error",
expectedExtMsg: "test failed",
expectedDetail: "test failed",
},
}

Expand Down Expand Up @@ -385,13 +385,12 @@ func TestJobMasterExit(t *testing.T) {
jobMaster.mu.Unlock()

// test exit status
err = jobMaster.base.Exit(ctx, cs.exitReason, cs.err, cs.extMsg)
err = jobMaster.base.Exit(ctx, cs.exitReason, cs.err, []byte(cs.detail))
require.NoError(t, err)
meta, err := jobMaster.base.master.frameMetaClient.GetJobByID(ctx, jobMaster.base.ID())
require.NoError(t, err)
require.Equal(t, cs.expectedState, meta.State)
require.Equal(t, cs.expectedExtMsg, meta.ExtMsg)

require.Equal(t, []byte(cs.expectedDetail), meta.Detail)
err = jobMaster.base.Close(ctx)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions engine/framework/fake/fake_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,13 +364,13 @@ func (m *Master) tickedCheckStatus(ctx context.Context) error {
if m.getState() == frameModel.WorkerStateStopped {
log.Info("FakeMaster: received pause command, stop now")
m.setState(frameModel.WorkerStateStopped)
return m.Exit(ctx, framework.ExitReasonCanceled, nil, "FakeMaster: received pause command")
return m.Exit(ctx, framework.ExitReasonCanceled, nil, []byte("FakeMaster: received pause command"))
}

if len(m.finishedSet) == m.config.WorkerCount {
log.Info("FakeMaster: all worker finished, job master exits now")
m.setState(frameModel.WorkerStateFinished)
return m.Exit(ctx, framework.ExitReasonFinished, nil, "all workers have been finished")
return m.Exit(ctx, framework.ExitReasonFinished, nil, []byte("all workers have been finished"))
}

return nil
Expand Down
10 changes: 5 additions & 5 deletions engine/framework/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type BaseMaster interface {
// Exit should be called when master (in user logic) wants to exit.
// exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed
// NOTE: Currently, no implement has used this method, but we still keep it to make the interface intact
Exit(ctx context.Context, exitReason ExitReason, err error, extMsg string) error
Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error

// CreateWorker requires the framework to dispatch a new worker.
// If the worker needs to access certain file system resources,
Expand Down Expand Up @@ -700,7 +700,7 @@ func (m *DefaultBaseMaster) IsMasterReady() bool {

// Exit implements BaseMaster.Exit
// NOTE: Currently, no implement has used this method, but we still keep it to make the interface intact
func (m *DefaultBaseMaster) Exit(ctx context.Context, exitReason ExitReason, err error, extMsg string) error {
func (m *DefaultBaseMaster) Exit(ctx context.Context, exitReason ExitReason, err error, detail []byte) error {
// Set the errCenter to prevent user from forgetting to return directly after calling 'Exit'
// keep the original error in errCenter if possible
defer func() {
Expand All @@ -710,10 +710,10 @@ func (m *DefaultBaseMaster) Exit(ctx context.Context, exitReason ExitReason, err
m.errCenter.OnError(err)
}()

return m.exitWithoutSetErrCenter(ctx, exitReason, err, extMsg)
return m.exitWithoutSetErrCenter(ctx, exitReason, err, detail)
}

func (m *DefaultBaseMaster) exitWithoutSetErrCenter(ctx context.Context, exitReason ExitReason, err error, extMsg string) (errRet error) {
func (m *DefaultBaseMaster) exitWithoutSetErrCenter(ctx context.Context, exitReason ExitReason, err error, detail []byte) (errRet error) {
switch exitReason {
case ExitReasonFinished:
m.masterMeta.State = frameModel.MasterStateFinished
Expand All @@ -729,7 +729,7 @@ func (m *DefaultBaseMaster) exitWithoutSetErrCenter(ctx context.Context, exitRea
if err != nil {
m.masterMeta.ErrorMsg = err.Error()
}
m.masterMeta.ExtMsg = extMsg
m.masterMeta.Detail = detail

metaClient := metadata.NewMasterMetadataClient(m.id, m.frameMetaClient)
return metaClient.Update(ctx, m.masterMeta)
Expand Down
28 changes: 14 additions & 14 deletions engine/framework/model/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ type MasterMeta struct {
// error message for the job
ErrorMsg string `json:"error-message" gorm:"column:error_message;type:text"`

// if job is finished or canceled, business logic can set self-defined job info to `ExtMsg`
ExtMsg string `json:"extend-message" gorm:"column:extend_message;type:text"`
// if job is finished or canceled, business logic can set self-defined job info to `Detail`
Detail []byte `json:"detail" gorm:"column:detail;type:blob"`

Ext MasterMetaExt `json:"ext" gorm:"column:ext;type:JSON"`

Expand All @@ -136,17 +136,17 @@ func (m *MasterMeta) Unmarshal(data []byte) error {
// Map is used for update the orm model
func (m *MasterMeta) Map() map[string]interface{} {
return map[string]interface{}{
"project_id": m.ProjectID,
"id": m.ID,
"type": m.Type,
"state": m.State,
"node_id": m.NodeID,
"address": m.Addr,
"epoch": m.Epoch,
"config": m.Config,
"error_message": m.ErrorMsg,
"extend_message": m.ExtMsg,
"ext": m.Ext,
"project_id": m.ProjectID,
"id": m.ID,
"type": m.Type,
"state": m.State,
"node_id": m.NodeID,
"address": m.Addr,
"epoch": m.Epoch,
"config": m.Config,
"error_message": m.ErrorMsg,
"detail": m.Detail,
"ext": m.Ext,
}
}

Expand All @@ -164,6 +164,6 @@ var MasterUpdateColumns = []string{
"epoch",
"config",
"error_message",
"extend_message",
"detail",
"ext",
}
4 changes: 2 additions & 2 deletions engine/jobmaster/cvsjob/cvs_job_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (jm *JobMaster) Tick(ctx context.Context) error {
jm.setState(frameModel.WorkerStateFinished)
log.Info("cvs job master finished")
status := jm.Status()
return jm.BaseJobMaster.Exit(ctx, framework.ExitReasonFinished, nil, string(status.ExtBytes))
return jm.BaseJobMaster.Exit(ctx, framework.ExitReasonFinished, nil, status.ExtBytes)
}
for idx, workerInfo := range jm.syncFilesInfo {
// check if need to recreate worker
Expand Down Expand Up @@ -214,7 +214,7 @@ func (jm *JobMaster) Tick(ctx context.Context) error {
if jm.getState() == frameModel.WorkerStateStopped {
log.Info("cvs job master stopped")
status := jm.Status()
return jm.BaseJobMaster.Exit(ctx, framework.ExitReasonCanceled, nil, string(status.ExtBytes))
return jm.BaseJobMaster.Exit(ctx, framework.ExitReasonCanceled, nil, status.ExtBytes)
}
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions engine/jobmaster/dm/dm_jobmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (jm *JobMaster) InitImpl(ctx context.Context) error {
return err
}
if err := jm.preCheck(ctx, jm.initJobCfg); err != nil {
return jm.Exit(ctx, framework.ExitReasonFailed, err, "")
return jm.Exit(ctx, framework.ExitReasonFailed, err, nil)
}
if err := jm.bootstrap(ctx); err != nil {
return err
Expand Down Expand Up @@ -368,27 +368,27 @@ func (jm *JobMaster) status(ctx context.Context, code frameModel.WorkerState) (f

// cancel remove jobCfg in metadata, and wait all workers offline.
func (jm *JobMaster) cancel(ctx context.Context, code frameModel.WorkerState) error {
var extMsg string
var detail []byte
status, err := jm.status(ctx, code)
if err != nil {
jm.Logger().Error("failed to get status", zap.Error(err))
} else {
extMsg = string(status.ExtBytes)
detail = status.ExtBytes
}

if err := jm.taskManager.OperateTask(ctx, dmpkg.Deleting, nil, nil); err != nil {
// would not recover again
return jm.Exit(ctx, framework.ExitReasonCanceled, err, extMsg)
return jm.Exit(ctx, framework.ExitReasonCanceled, err, detail)
}
// wait all worker exit
jm.workerManager.SetNextCheckTime(time.Now())
for {
select {
case <-ctx.Done():
return jm.Exit(ctx, framework.ExitReasonCanceled, ctx.Err(), extMsg)
return jm.Exit(ctx, framework.ExitReasonCanceled, ctx.Err(), detail)
case <-time.After(time.Second):
if jm.workerManager.allTombStone() {
return jm.Exit(ctx, framework.WorkerStateToExitReason(status.State), err, extMsg)
return jm.Exit(ctx, framework.WorkerStateToExitReason(status.State), err, detail)
}
jm.workerManager.SetNextCheckTime(time.Now())
}
Expand Down
2 changes: 1 addition & 1 deletion engine/jobmaster/dm/dm_jobmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (m *MockBaseJobmaster) Logger() *zap.Logger {
return log.L()
}

func (m *MockBaseJobmaster) Exit(ctx context.Context, exitReason framework.ExitReason, err error, extMsg string) error {
func (m *MockBaseJobmaster) Exit(ctx context.Context, exitReason framework.ExitReason, err error, detail []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
args := m.Called()
Expand Down
Loading

0 comments on commit 9398176

Please sign in to comment.