From 793f370ec52ea6df3953a34b89892912fa8c5012 Mon Sep 17 00:00:00 2001 From: Brandur Leach Date: Mon, 13 May 2024 08:16:57 +0200 Subject: [PATCH] Consider per-worker timeout overrides when rescuing jobs (#350) This one came up when I was thinking about the job specific rescue threshold floated in [1]. I was going to suggest the possible workaround of setting an aggressive rescue threshold combined with a low job timeout globally, and then override the timeout on any specific job workers that needed to run longer than the new low global job timeout. But then I realized this wouldn't work because the job rescuer doesn't account for job-specific timeouts -- it just rescues or discards everything it finds beyond the run's rescue threshold. Here, add new logic to address that problem. Luckily we were already pulling worker information to procure what might be a possible custom retry schedule, so we just have to piggyback onto that to also examine a possible custom work timeout. [1] https://github.com/riverqueue/river/issues/347 --- CHANGELOG.md | 4 ++ internal/maintenance/job_rescuer.go | 84 ++++++++++++++++-------- internal/maintenance/job_rescuer_test.go | 78 ++++++++++++++-------- 3 files changed, 112 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 814ca82c..daeeadab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- River now considers per-worker timeout overrides when rescuing jobs so that jobs with a long custom timeout won't be rescued prematurely. [PR #350](https://github.com/riverqueue/river/pull/350). + ## [0.6.0] - 2024-05-08 ### Added diff --git a/internal/maintenance/job_rescuer.go b/internal/maintenance/job_rescuer.go index 146a1135..41edd91f 100644 --- a/internal/maintenance/job_rescuer.go +++ b/internal/maintenance/job_rescuer.go @@ -11,6 +11,7 @@ import ( "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/maintenance/startstop" "github.com/riverqueue/river/internal/rivercommon" + "github.com/riverqueue/river/internal/util/ptrutil" "github.com/riverqueue/river/internal/util/timeutil" "github.com/riverqueue/river/internal/util/valutil" "github.com/riverqueue/river/internal/workunit" @@ -164,22 +165,20 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) now := time.Now().UTC() rescueManyParams := riverdriver.JobRescueManyParams{ - ID: make([]int64, len(stuckJobs)), - Error: make([][]byte, len(stuckJobs)), - FinalizedAt: make([]time.Time, len(stuckJobs)), - ScheduledAt: make([]time.Time, len(stuckJobs)), - State: make([]string, len(stuckJobs)), + ID: make([]int64, 0, len(stuckJobs)), + Error: make([][]byte, 0, len(stuckJobs)), + FinalizedAt: make([]time.Time, 0, len(stuckJobs)), + ScheduledAt: make([]time.Time, 0, len(stuckJobs)), + State: make([]string, 0, len(stuckJobs)), } - for i, job := range stuckJobs { - rescueManyParams.ID[i] = job.ID - + for _, job := range stuckJobs { var metadata metadataWithCancelAttemptedAt if err := json.Unmarshal(job.Metadata, &metadata); err != nil { return nil, fmt.Errorf("error unmarshaling job metadata: %w", err) } - rescueManyParams.Error[i], err = json.Marshal(rivertype.AttemptError{ + errorData, err := json.Marshal(rivertype.AttemptError{ At: now, Attempt: max(job.Attempt, 0), Error: "Stuck job rescued by Rescuer", @@ -189,29 +188,41 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) return nil, fmt.Errorf("error marshaling error JSON: %w", err) } + addRescueParam := func(state rivertype.JobState, finalizedAt *time.Time, scheduledAt time.Time) { + rescueManyParams.ID = append(rescueManyParams.ID, job.ID) + rescueManyParams.Error = append(rescueManyParams.Error, errorData) + rescueManyParams.FinalizedAt = append(rescueManyParams.FinalizedAt, ptrutil.ValOrDefault(finalizedAt, time.Time{})) + rescueManyParams.ScheduledAt = append(rescueManyParams.ScheduledAt, scheduledAt) + rescueManyParams.State = append(rescueManyParams.State, string(state)) + } + if !metadata.CancelAttemptedAt.IsZero() { res.NumJobsCancelled++ - rescueManyParams.FinalizedAt[i] = now - rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value - rescueManyParams.State[i] = string(rivertype.JobStateCancelled) + addRescueParam(rivertype.JobStateCancelled, &now, job.ScheduledAt) // reused previous scheduled value continue } - shouldRetry, retryAt := s.makeRetryDecision(ctx, job) - if shouldRetry { - res.NumJobsRetried++ - rescueManyParams.ScheduledAt[i] = retryAt - rescueManyParams.State[i] = string(rivertype.JobStateRetryable) - } else { + + retryDecision, retryAt := s.makeRetryDecision(ctx, job, now) + + switch retryDecision { + case jobRetryDecisionDiscard: res.NumJobsDiscarded++ - rescueManyParams.FinalizedAt[i] = now - rescueManyParams.ScheduledAt[i] = job.ScheduledAt // reuse previous value - rescueManyParams.State[i] = string(rivertype.JobStateDiscarded) + addRescueParam(rivertype.JobStateDiscarded, &now, job.ScheduledAt) // reused previous scheduled value + + case jobRetryDecisionIgnore: + // job not timed out yet due to kind-specific timeout value; ignore + + case jobRetryDecisionRetry: + res.NumJobsRetried++ + addRescueParam(rivertype.JobStateRetryable, nil, retryAt) } } - _, err = s.exec.JobRescueMany(ctx, &rescueManyParams) - if err != nil { - return nil, fmt.Errorf("error rescuing stuck jobs: %w", err) + if len(rescueManyParams.ID) > 0 { + _, err = s.exec.JobRescueMany(ctx, &rescueManyParams) + if err != nil { + return nil, fmt.Errorf("error rescuing stuck jobs: %w", err) + } } s.TestSignals.UpdatedBatch.Signal(struct{}{}) @@ -245,14 +256,24 @@ func (s *JobRescuer) getStuckJobs(ctx context.Context) ([]*rivertype.JobRow, err }) } +// jobRetryDecision is a signal from makeRetryDecision as to what to do with a +// particular job that appears to be eligible for rescue. +type jobRetryDecision int + +const ( + jobRetryDecisionDiscard jobRetryDecision = iota // discard the job + jobRetryDecisionIgnore // don't retry or discard the job + jobRetryDecisionRetry // retry the job +) + // makeRetryDecision decides whether or not a rescued job should be retried, and if so, // when. -func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow) (bool, time.Time) { +func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRow, now time.Time) (jobRetryDecision, time.Time) { workUnitFactory := s.Config.WorkUnitFactoryFunc(job.Kind) if workUnitFactory == nil { s.Logger.ErrorContext(ctx, s.Name+": Attempted to rescue unhandled job kind, discarding", slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID)) - return false, time.Time{} + return jobRetryDecisionDiscard, time.Time{} } workUnit := workUnitFactory.MakeUnit(job) @@ -261,9 +282,18 @@ func (s *JobRescuer) makeRetryDecision(ctx context.Context, job *rivertype.JobRo slog.String("job_kind", job.Kind), slog.Int64("job_id", job.ID)) } + if workUnit.Timeout() != 0 && now.Sub(*job.AttemptedAt) < workUnit.Timeout() { + return jobRetryDecisionIgnore, time.Time{} + } + nextRetry := workUnit.NextRetry() if nextRetry.IsZero() { nextRetry = s.Config.ClientRetryPolicy.NextRetry(job) } - return job.Attempt < max(job.MaxAttempts, 0), nextRetry + + if job.Attempt < max(job.MaxAttempts, 0) { + return jobRetryDecisionRetry, nextRetry + } + + return jobRetryDecisionDiscard, time.Time{} } diff --git a/internal/maintenance/job_rescuer_test.go b/internal/maintenance/job_rescuer_test.go index 66617842..92137750 100644 --- a/internal/maintenance/job_rescuer_test.go +++ b/internal/maintenance/job_rescuer_test.go @@ -23,20 +23,22 @@ import ( // callbackWorkUnitFactory wraps a Worker to implement workUnitFactory. type callbackWorkUnitFactory struct { Callback func(ctx context.Context, jobRow *rivertype.JobRow) error + timeout time.Duration // defaults to 0, which signals default timeout } func (w *callbackWorkUnitFactory) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit { - return &callbackWorkUnit{callback: w.Callback, jobRow: jobRow} + return &callbackWorkUnit{callback: w.Callback, jobRow: jobRow, timeout: w.timeout} } // callbackWorkUnit implements workUnit for a job and Worker. type callbackWorkUnit struct { callback func(ctx context.Context, jobRow *rivertype.JobRow) error jobRow *rivertype.JobRow + timeout time.Duration // defaults to 0, which signals default timeout } func (w *callbackWorkUnit) NextRetry() time.Time { return time.Now().Add(30 * time.Second) } -func (w *callbackWorkUnit) Timeout() time.Duration { return 0 } +func (w *callbackWorkUnit) Timeout() time.Duration { return w.timeout } func (w *callbackWorkUnit) Work(ctx context.Context) error { return w.callback(ctx, w.jobRow) } func (w *callbackWorkUnit) UnmarshalJob() error { return nil } @@ -51,10 +53,13 @@ func (p *SimpleClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time { func TestJobRescuer(t *testing.T) { t.Parallel() - const rescuerJobKind = "rescuer" - ctx := context.Background() + const ( + rescuerJobKind = "rescuer" + rescuerJobKindLongTimeout = "rescuer_long_timeout" + ) + type testBundle struct { exec riverdriver.Executor rescueHorizon time.Time @@ -76,8 +81,13 @@ func TestJobRescuer(t *testing.T) { Interval: JobRescuerIntervalDefault, RescueAfter: JobRescuerRescueAfterDefault, WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { - if kind == rescuerJobKind { - return &callbackWorkUnitFactory{Callback: func(ctx context.Context, jobRow *rivertype.JobRow) error { return nil }} + emptyCallback := func(ctx context.Context, jobRow *rivertype.JobRow) error { return nil } + + switch kind { + case rescuerJobKind: + return &callbackWorkUnitFactory{Callback: emptyCallback} + case rescuerJobKindLongTimeout: + return &callbackWorkUnitFactory{Callback: emptyCallback, timeout: JobRescuerRescueAfterDefault + 5*time.Minute} } panic("unhandled kind: " + kind) }, @@ -135,11 +145,18 @@ func TestJobRescuer(t *testing.T) { stuckToCancelJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)}) stuckToCancelJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(1 * time.Minute)), Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at": %q}`, cancelTime)), MaxAttempts: ptrutil.Ptr(5)}) // won't be rescued - // these aren't touched: + // these aren't touched because they're in ineligible states notRunningJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCompleted), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) notRunningJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateDiscarded), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) notRunningJob3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), FinalizedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), State: ptrutil.Ptr(rivertype.JobStateCancelled), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) + // Jobs with worker-specific long timeouts. The first isn't rescued + // because the difference between its `attempted_at` and now is still + // within the timeout threshold. The second _is_ rescued because it + // started earlier and even with the longer timeout, has still timed out. + longTimeOutJob1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) + longTimeOutJob2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKindLongTimeout), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-6 * time.Minute)), MaxAttempts: ptrutil.Ptr(5)}) + require.NoError(cleaner.Start(ctx)) cleaner.TestSignals.FetchedBatch.WaitOrTimeout() @@ -158,37 +175,44 @@ func TestJobRescuer(t *testing.T) { require.NoError(err) require.Equal(stuckToRetryJob3.State, job3After.State) // not rescued - discard1After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob1.ID) + discardJob1After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob1.ID) require.NoError(err) - require.Equal(rivertype.JobStateDiscarded, discard1After.State) - require.WithinDuration(time.Now(), *discard1After.FinalizedAt, 5*time.Second) - require.Len(discard1After.Errors, 1) + require.Equal(rivertype.JobStateDiscarded, discardJob1After.State) + require.WithinDuration(time.Now(), *discardJob1After.FinalizedAt, 5*time.Second) + require.Len(discardJob1After.Errors, 1) - discard2After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob2.ID) + discardJob2After, err := bundle.exec.JobGetByID(ctx, stuckToDiscardJob2.ID) require.NoError(err) - require.Equal(rivertype.JobStateRunning, discard2After.State) - require.Nil(discard2After.FinalizedAt) + require.Equal(rivertype.JobStateRunning, discardJob2After.State) + require.Nil(discardJob2After.FinalizedAt) - cancel1After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob1.ID) + cancelJob1After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob1.ID) require.NoError(err) - require.Equal(rivertype.JobStateCancelled, cancel1After.State) - require.WithinDuration(time.Now(), *cancel1After.FinalizedAt, 5*time.Second) - require.Len(cancel1After.Errors, 1) + require.Equal(rivertype.JobStateCancelled, cancelJob1After.State) + require.WithinDuration(time.Now(), *cancelJob1After.FinalizedAt, 5*time.Second) + require.Len(cancelJob1After.Errors, 1) - cancel2After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob2.ID) + cancelJob2After, err := bundle.exec.JobGetByID(ctx, stuckToCancelJob2.ID) require.NoError(err) - require.Equal(rivertype.JobStateRunning, cancel2After.State) - require.Nil(cancel2After.FinalizedAt) + require.Equal(rivertype.JobStateRunning, cancelJob2After.State) + require.Nil(cancelJob2After.FinalizedAt) - notRunning1After, err := bundle.exec.JobGetByID(ctx, notRunningJob1.ID) + notRunningJob1After, err := bundle.exec.JobGetByID(ctx, notRunningJob1.ID) require.NoError(err) - require.Equal(notRunning1After.State, notRunningJob1.State) - notRunning2After, err := bundle.exec.JobGetByID(ctx, notRunningJob2.ID) + require.Equal(notRunningJob1.State, notRunningJob1After.State) + notRunningJob2After, err := bundle.exec.JobGetByID(ctx, notRunningJob2.ID) + require.NoError(err) + require.Equal(notRunningJob2.State, notRunningJob2After.State) + notRunningJob3After, err := bundle.exec.JobGetByID(ctx, notRunningJob3.ID) + require.NoError(err) + require.Equal(notRunningJob3.State, notRunningJob3After.State) + + notTimedOutJob1After, err := bundle.exec.JobGetByID(ctx, longTimeOutJob1.ID) require.NoError(err) - require.Equal(notRunning2After.State, notRunningJob2.State) - notRunning3After, err := bundle.exec.JobGetByID(ctx, notRunningJob3.ID) + require.Equal(rivertype.JobStateRunning, notTimedOutJob1After.State) + notTimedOutJob2After, err := bundle.exec.JobGetByID(ctx, longTimeOutJob2.ID) require.NoError(err) - require.Equal(notRunning3After.State, notRunningJob3.State) + require.Equal(rivertype.JobStateRetryable, notTimedOutJob2After.State) }) t.Run("RescuesInBatches", func(t *testing.T) {