Skip to content

Commit

Permalink
expose JobCancelError, JobSnoozeError (riverqueue#665)
Browse files Browse the repository at this point in the history
These were initially kept as unexported types, but it turns out they're
useful for testing. Still, they should not be initialized directly (i.e.
from within workers) and should be used through the `JobCancel` and
`JobSnooze` top level functions.

Fixes riverqueue#625.
  • Loading branch information
bgentry authored and tigrato committed Dec 18, 2024
1 parent 2e4ba00 commit d68770f
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Expose `JobCancelError` and `JobSnoozeError` types to more easily facilitate testing. [PR #665](https://github.com/riverqueue/river/pull/665).

### Changed

- Tune the client to be more aggressive about fetching when it just fetched a full batch of jobs, or when it skipped its previous triggered fetch because it was already full. This should bring more consistent throughput to poll-only mode and in cases where there is a backlog of existing jobs but new ones aren't being actively inserted. This will result in increased fetch load on many installations, with the benefit of increased throughput. As before, `FetchCooldown` still limits how frequently these fetches can occur on each client and can be increased to reduce the amount of fetch querying. Thanks Chris Gaffney ([@gaffneyc](https://github.com/gaffneyc)) for the idea, initial implementation, and benchmarks. [PR #663](https://github.com/riverqueue/river/pull/663).
Expand Down
38 changes: 22 additions & 16 deletions job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,30 @@ func (e *UnknownJobKindError) Is(target error) bool {
// the job at the end of execution. Regardless of whether or not the job has any
// remaining attempts, this will ensure the job does not execute again.
func JobCancel(err error) error {
return &jobCancelError{err: err}
return &JobCancelError{err: err}
}

type jobCancelError struct {
// JobCancelError is the error type returned by JobCancel. It should not be
// initialized directly, but is returned from the [JobCancel] function and can
// be used for test assertions.
type JobCancelError struct {
err error
}

func (e *jobCancelError) Error() string {
func (e *JobCancelError) Error() string {
if e.err == nil {
return "jobCancelError: <nil>"
return "JobCancelError: <nil>"
}
// should not ever be called, but add a prefix just in case:
return "jobCancelError: " + e.err.Error()
return "JobCancelError: " + e.err.Error()
}

func (e *jobCancelError) Is(target error) bool {
_, ok := target.(*jobCancelError)
func (e *JobCancelError) Is(target error) bool {
_, ok := target.(*JobCancelError)
return ok
}

func (e *jobCancelError) Unwrap() error { return e.err }
func (e *JobCancelError) Unwrap() error { return e.err }

// JobSnooze can be returned from a Worker's Work method to cause the job to be
// tried again after the specified duration. This also has the effect of
Expand All @@ -81,20 +84,23 @@ func JobSnooze(duration time.Duration) error {
if duration < 0 {
panic("JobSnooze: duration must be >= 0")
}
return &jobSnoozeError{duration: duration}
return &JobSnoozeError{duration: duration}
}

type jobSnoozeError struct {
// JobSnoozeError is the error type returned by JobSnooze. It should not be
// initialized directly, but is returned from the [JobSnooze] function and can
// be used for test assertions.
type JobSnoozeError struct {
duration time.Duration
}

func (e *jobSnoozeError) Error() string {
func (e *JobSnoozeError) Error() string {
// should not ever be called, but add a prefix just in case:
return fmt.Sprintf("jobSnoozeError: %s", e.duration)
return fmt.Sprintf("JobSnoozeError: %s", e.duration)
}

func (e *jobSnoozeError) Is(target error) bool {
_, ok := target.(*jobSnoozeError)
func (e *JobSnoozeError) Is(target error) bool {
_, ok := target.(*JobSnoozeError)
return ok
}

Expand Down Expand Up @@ -270,7 +276,7 @@ func (e *jobExecutor) invokeErrorHandler(ctx context.Context, res *jobExecutorRe
}

func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) {
var snoozeErr *jobSnoozeError
var snoozeErr *JobSnoozeError

if res.Err != nil && errors.As(res.Err, &snoozeErr) {
e.Logger.DebugContext(ctx, e.Name+": Job snoozed",
Expand Down Expand Up @@ -316,7 +322,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult)
func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) {
var (
cancelJob bool
cancelErr *jobCancelError
cancelErr *JobCancelError
)

logAttrs := []any{
Expand Down
4 changes: 2 additions & 2 deletions job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestJobExecutor_Execute(t *testing.T) {
require.Len(t, job.Errors, 1)
require.WithinDuration(t, time.Now(), job.Errors[0].At, 2*time.Second)
require.Equal(t, 1, job.Errors[0].Attempt)
require.Equal(t, "jobCancelError: throw away this job", job.Errors[0].Error)
require.Equal(t, "JobCancelError: throw away this job", job.Errors[0].Error)
require.Equal(t, "", job.Errors[0].Trace)
})

Expand Down Expand Up @@ -700,7 +700,7 @@ func TestJobExecutor_Execute(t *testing.T) {
require.Len(t, job.Errors, 1)
require.WithinDuration(t, time.Now(), job.Errors[0].At, 2*time.Second)
require.Equal(t, 1, job.Errors[0].Attempt)
require.Equal(t, "jobCancelError: job cancelled remotely", job.Errors[0].Error)
require.Equal(t, "JobCancelError: job cancelled remotely", job.Errors[0].Error)
require.Equal(t, ErrJobCancelledRemotely.Error(), job.Errors[0].Error)
require.Equal(t, "", job.Errors[0].Trace)
})
Expand Down

0 comments on commit d68770f

Please sign in to comment.