Skip to content

Commit

Permalink
Re-perform a job if a StandardError bubbles up; better document job r…
Browse files Browse the repository at this point in the history
…eliability
  • Loading branch information
bensheldon committed Jul 26, 2020
1 parent 354ed13 commit ec2951b
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 62 deletions.
70 changes: 65 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,25 +79,85 @@ $ bundle install
# [--poll-interval=N] # Interval between polls for available jobs in seconds (default: 1)
```

### Taking advantage of ActiveJob
### Error handling, retries, and reliability

ActiveJob has a rich set of built-in functionality for timeouts, error handling, and retrying. For example:
GoodJob guarantees _at-least-once_ performance of jobs. GoodJob fully supports ActiveJob's built-in functionality for error handling, retries and timeouts.
#### Error handling
By default, if a job raises an error while it is being performed, _and it bubbles up to the GoodJob backend_, GoodJob will be immediately re-perform the job until it finishes successfully.
- `Exception`-type errors, such as a SIGINT, will always cause a job to be re-performed.
- `StandardError`-type errors, by default, will cause a job to be re-performed, though this is configurable:
```ruby
# config/initializers/good_job.rb
GoodJob.reperform_jobs_on_standard_error = true # => default
```
### Retrying jobs
ActiveJob can be configured to retry an infinite number of times, with an exponential backoff. Using ActiveJob's `retry_on` will ensure that errors do not bubble up to the GoodJob backend:

```ruby
class ApplicationJob < ActiveJob::Base
# Retry errors an infinite number of times with exponential back-off
retry_on StandardError, wait: :exponentially_longer, attempts: Float::INFINITY
# ...
end
```

When specifying a limited number of retries, care must be taken to ensure that an error does not bubble up to the GoodJob backend because that will result in the job being re-performed:

```ruby
class ApplicationJob < ActiveJob::Base
retry_on StandardError, attempts: 5 do |_job, _exception|
# Log error, etc.
# You must implement this block, otherwise,
# Active Job will re-raise the error.
# Do not re-raise the error, otherwise
# GoodJob will immediately re-perform the job.
end
# ...
end
```

GoodJob can be configured to allow omitting `retry_on`'s block argument and implicitly discard un-handled errors:
# Timeout jobs after 10 minutes
```ruby
# config/initializers/good_job.rb
# Do NOT re-perform a job if a StandardError bubbles up to the GoodJob backend
GoodJob.reperform_jobs_on_standard_error = false
```
ActiveJob's `discard_on` functionality is supported too.

#### ActionMailer retries

Using a Mailer's `#deliver_later` will enqueue an instance of `ActionMailer::DeliveryJob` which inherits from `ActiveJob::Base` rather than your applications `ApplicationJob`. You can use an initializer to configure retries on `ActionMailer::DeliveryJob`:
```ruby
# config/initializers/good_job.rb
ActionMailer::DeliveryJob.retry_on StandardError, wait: :exponentially_longer, attempts: Float::INFINITY
```
#### Timeouts
Job timeouts can be configured with an `around_perform`:
```ruby
class ApplicationJob < ActiveJob::Base
JobTimeoutError = Class.new(StandardError)
around_perform do |_job, block|
# Timeout jobs after 10 minutes
Timeout.timeout(10.minutes, JobTimeoutError) do
block.call
end
end
end
```
### Configuring Job Execution Threads
GoodJob executes enqueued jobs using threads. There is a lot than can be said about [multithreaded behavior in Ruby on Rails](https://guides.rubyonrails.org/threading_and_code_execution.html), but briefly:
Expand Down
1 change: 1 addition & 0 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

module GoodJob
mattr_accessor :preserve_job_records, default: false
mattr_accessor :reperform_jobs_on_standard_error, default: true
include Logging

ActiveSupport.run_load_hooks(:good_job, self)
Expand Down
8 changes: 1 addition & 7 deletions lib/good_job/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,7 @@ def start
job_query = GoodJob::Job.all.priority_ordered
queue_names_without_all = queue_names.reject { |q| q == '*' }
job_query = job_query.where(queue_name: queue_names_without_all) unless queue_names_without_all.size.zero?

performer_method = if GoodJob.preserve_job_records
:perform_with_advisory_lock_and_preserve_job_records
else
:perform_with_advisory_lock_and_destroy_job_records
end
job_performer = GoodJob::Performer.new(job_query, performer_method)
job_performer = GoodJob::Performer.new(job_query, :perform_with_advisory_lock)

$stdout.puts "GoodJob worker starting with max_threads=#{max_threads} on queues=#{queue_names.join(',')}"

Expand Down
47 changes: 24 additions & 23 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Job < ActiveRecord::Base
scope :priority_ordered, -> { order(priority: :desc) }
scope :finished, ->(timestamp = nil) { timestamp ? where(arel_table['finished_at'].lteq(timestamp)) : where.not(finished_at: nil) }

def self.perform_with_advisory_lock(destroy_after: !GoodJob.preserve_job_records)
def self.perform_with_advisory_lock
good_job = nil
result = nil
error = nil
Expand All @@ -30,20 +30,12 @@ def self.perform_with_advisory_lock(destroy_after: !GoodJob.preserve_job_records
good_job = good_jobs.first
break unless good_job

result, error = good_job.perform(destroy_after: destroy_after)
result, error = good_job.perform
end

[good_job, result, error] if good_job
end

def self.perform_with_advisory_lock_and_preserve_job_records
perform_with_advisory_lock(destroy_after: false)
end

def self.perform_with_advisory_lock_and_destroy_job_records
perform_with_advisory_lock(destroy_after: true)
end

def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
good_job = nil
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
Expand All @@ -64,40 +56,49 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false
good_job
end

def perform(destroy_after: true)
def perform(destroy_after: !GoodJob.preserve_job_records, reperform_on_standard_error: GoodJob.reperform_jobs_on_standard_error)
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

result = nil
rescued_error = nil
error = nil

ActiveSupport::Notifications.instrument("before_perform_job.good_job", { good_job: self })
self.performed_at = Time.current
save! unless destroy_after

ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self }) do
params = serialized_params.merge(
"provider_job_id" => id
)
begin
params = serialized_params.merge(
"provider_job_id" => id
)

begin
ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self }) do
result = ActiveJob::Base.execute(params)
rescue StandardError => e
error = e
end
rescue StandardError => e
rescued_error = e
end

if error.nil? && result.is_a?(Exception)
if rescued_error
error = rescued_error
elsif result.is_a?(Exception)
error = result
result = nil
end

error_message = "#{error.class}: #{error.message}" if error
self.error = error_message
self.finished_at = Time.current

if destroy_after
destroy!
else
if rescued_error && reperform_on_standard_error
save!
else
self.finished_at = Time.current

if destroy_after
destroy!
else
save!
end
end

[result, error]
Expand Down
61 changes: 34 additions & 27 deletions spec/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,6 @@ def perform(result_value = nil, raise_error: false)
expect(errored_good_job).to eq e_good_job
expect(errored_result).to eq nil
expect(errored_error).to be_an ExpectedError

# Nothing is on the queue
output = described_class.all.perform_with_advisory_lock
expect(output).to eq nil
end

it 'destroy_after: false does not delete the job but marks it as finished' do
described_class.all.perform_with_advisory_lock(destroy_after: false)

expect(good_job.reload).to have_attributes(
performed_at: within(1.second).of(Time.current),
finished_at: within(1.second).of(Time.current)
)
end
end

Expand Down Expand Up @@ -167,20 +154,40 @@ def perform(result_value = nil, raise_error: false)
expect(error).to be_a(ExpectedError)
end

it 'destroys the job' do
good_job.perform

expect { good_job.reload }.to raise_error ActiveRecord::RecordNotFound
end

it 'can preserve the job' do
good_job.perform(destroy_after: false)

expect(good_job.reload).to have_attributes(
error: "ExpectedError: Raised expected error",
performed_at: within(1.second).of(Time.current),
finished_at: within(1.second).of(Time.current)
)
describe 'GoodJob.reperform_jobs_on_standard_error behavior' do
context 'when true' do
it 'leaves the job record unfinished' do
good_job.perform(destroy_after: false)

expect(good_job.reload).to have_attributes(
error: "ExpectedError: Raised expected error",
performed_at: within(1.second).of(Time.current),
finished_at: nil
)
end

it 'does not destroy the job record' do
good_job.perform(destroy_after: true)
expect { good_job.reload }.not_to raise_error
end
end

context 'when false' do
it 'destroys the job' do
good_job.perform(destroy_after: true, reperform_on_standard_error: false)
expect { good_job.reload }.to raise_error ActiveRecord::RecordNotFound
end

it 'can preserve the job' do
good_job.perform(destroy_after: false, reperform_on_standard_error: false)

expect(good_job.reload).to have_attributes(
error: "ExpectedError: Raised expected error",
performed_at: within(1.second).of(Time.current),
finished_at: within(1.second).of(Time.current)
)
end
end
end
end
end
Expand Down

0 comments on commit ec2951b

Please sign in to comment.