Skip to content

Commit

Permalink
add Rescheduled metric for executor.
Browse files Browse the repository at this point in the history
  • Loading branch information
Higan committed Jul 23, 2024
1 parent 3b2dcd8 commit 748fe63
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
12 changes: 8 additions & 4 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (e *executor) start() {
default:
// runner is busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
e.incrementJobCounter(*j, Rescheduled)
e.sendOutForRescheduling(&jIn)
}
} else {
Expand Down Expand Up @@ -397,9 +397,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {

startTime := time.Now()
err := e.callJobWithRecover(j)
if e.monitor != nil {
e.monitor.RecordJobTiming(startTime, time.Now(), j.id, j.name, j.tags)
}
e.recordJobTiming(startTime, time.Now(), j)
if err != nil {
_ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err)
e.incrementJobCounter(j, Fail)
Expand All @@ -422,6 +420,12 @@ func (e *executor) callJobWithRecover(j internalJob) (err error) {
return callJobFuncWithParams(j.function, j.parameters...)
}

func (e *executor) recordJobTiming(start time.Time, end time.Time, j internalJob) {
if e.monitor != nil {
e.monitor.RecordJobTiming(start, end, j.id, j.name, j.tags)
}
}

func (e *executor) incrementJobCounter(j internalJob, status JobStatus) {
if e.monitor != nil {
e.monitor.IncrementJob(j.id, j.name, j.tags, status)
Expand Down
7 changes: 4 additions & 3 deletions monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ type JobStatus string

// The different statuses of job that can be used.
const (
Fail JobStatus = "fail"
Success JobStatus = "success"
Skip JobStatus = "skip"
Fail JobStatus = "fail"
Success JobStatus = "success"
Skip JobStatus = "skip"
Rescheduled JobStatus = "rescheduled"
)

// Monitor represents the interface to collect jobs metrics.
Expand Down
2 changes: 1 addition & 1 deletion scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) {
}

// Jobs coming back from the executor to the scheduler that
// need to evaluated for rescheduling.
// need to be evaluated for rescheduling.
func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
select {
case <-s.shutdownCtx.Done():
Expand Down

0 comments on commit 748fe63

Please sign in to comment.