Skip to content

Commit

Permalink
Add "Retry job" button to Dashboard for Discarded jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Oct 18, 2021
1 parent 377cde2 commit 16ca0db
Show file tree
Hide file tree
Showing 12 changed files with 227 additions and 37 deletions.
24 changes: 24 additions & 0 deletions engine/app/controllers/good_job/jobs_controller.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# frozen_string_literal: true
module GoodJob
class JobsController < GoodJob::BaseController
rescue_from GoodJob::ActiveJobJob::AdapterNotGoodJobError,
GoodJob::ActiveJobJob::ActionForStateMismatchError,
with: :redirect_on_error

def index
@filter = JobsFilter.new(params)
end
Expand All @@ -10,5 +14,25 @@ def show
.order(Arel.sql("COALESCE(scheduled_at, created_at) DESC"))
redirect_to root_path, alert: "Executions for Active Job #{params[:id]} not found" if @executions.empty?
end

def retry
@job = ActiveJobJob.find(params[:id])
@job.retry_job
redirect_back(fallback_location: jobs_path, notice: "Job has been retried")
end

private

def redirect_on_error(exception)
alert = case exception
when GoodJob::ActiveJobJob::AdapterNotGoodJobError
"ActiveJob Queue Adapter must be GoodJob."
when GoodJob::ActiveJobJob::ActionForStateMismatchError
"Job is not in an appropriate state for this action."
else
exception.to_s
end
redirect_back(fallback_location: jobs_path, alert: alert)
end
end
end
4 changes: 3 additions & 1 deletion engine/app/filters/good_job/jobs_filter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ def base_query
end

def filtered_query
query = base_query
query = base_query.includes(:executions)
.joins_advisory_locks.select('good_jobs.*', 'pg_locks.locktype AS locktype')

query = query.job_class(params[:job_class]) if params[:job_class]
query = query.where(queue_name: params[:queue_name]) if params[:queue_name]

Expand Down
49 changes: 42 additions & 7 deletions engine/app/models/good_job/active_job_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ module GoodJob
class ActiveJobJob < Object.const_get(GoodJob.active_record_parent_class)
include GoodJob::Lockable

ActionForStateMismatchError = Class.new(StandardError)
AdapterNotGoodJobError = Class.new(StandardError)

self.table_name = 'good_jobs'
self.primary_key = 'active_job_id'
self.advisory_lockable_column = 'active_job_id'
Expand Down Expand Up @@ -69,14 +72,15 @@ def job_class
end

def status
if finished_at.present?
if error.present?
execution = head_execution
if execution.finished_at.present?
if execution.error.present?
:discarded
else
:finished
end
elsif (scheduled_at || created_at) > DateTime.current
if serialized_params.fetch('executions', 0) > 1
elsif (execution.scheduled_at || execution.created_at) > DateTime.current
if execution.serialized_params.fetch('executions', 0) > 1
:retried
else
:scheduled
Expand All @@ -88,7 +92,13 @@ def status
end
end

def head_execution
def head?
_execution_id == head_execution(reload: true).id
end

def head_execution(reload: false)
executions.reload if reload
executions.load # memoize the results
executions.last
end

Expand All @@ -97,7 +107,7 @@ def tail_execution
end

def executions_count
aj_count = serialized_params.fetch('executions', 0)
aj_count = head_execution.serialized_params.fetch('executions', 0)
# The execution count within serialized_params is not updated
# once the underlying execution has been executed.
if status.in? [:discarded, :finished, :running]
Expand All @@ -112,7 +122,7 @@ def preserved_executions_count
end

def recent_error
error.presence || executions[-2]&.error
head_execution.error || executions[-2]&.error
end

def running?
Expand All @@ -123,5 +133,30 @@ def running?
advisory_locked?
end
end

def retry_job
with_advisory_lock do
execution = head_execution(reload: true)
active_job = execution.active_job

raise AdapterNotGoodJobError unless active_job.class.queue_adapter.is_a? GoodJob::Adapter
raise ActionForStateMismatchError unless status == :discarded

# Update the executions count because the previous execution will not have been preserved
# Do not update `exception_executions` because that comes from rescue_from's arguments
active_job.executions = (active_job.executions || 0) + 1

new_active_job = nil
GoodJob::CurrentThread.within do |current_thread|
current_thread.execution = execution

execution.class.transaction(joinable: false, requires_new: true) do
new_active_job = active_job.retry_job(wait: 0, error: error)
execution.save
end
end
new_active_job
end
end
end
end
10 changes: 5 additions & 5 deletions engine/app/views/good_job/shared/_jobs_table.erb
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
%>
<%= tag.pre JSON.pretty_generate(job.serialized_params), id: dom_id(job, "params"), class: "collapse job-params" %>
</td>
<!-- <td>-->
<%#= button_to execution_path(execution.id), method: :delete, class: "btn btn-sm btn-outline-danger", title: "Delete execution" do %>
<%#= render "good_job/shared/icons/trash" %>
<%# end %>
<!-- </td>-->
<td>
<%= button_to retry_job_path(job.id), method: :put, class: "btn btn-sm #{job.status == :discarded ? 'btn-outline-primary' : 'btn-outline-secondary'}", disabled: job.status != :discarded, aria: { label: "Retry job"}, title: "Retry job" do %>
<%= render "good_job/shared/icons/arrow_clockwise" %>
<% end %>
</td>
</tr>
<% end %>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<!-- https://icons.getbootstrap.com/icons/arrow-clockwise/ -->
<svg xmlns="http://www.w3.org/2000/svg" width="16" height="16" fill="currentColor" class="bi bi-arrow-clockwise" viewBox="0 0 16 16">
<path fill-rule="evenodd" d="M8 3a5 5 0 1 0 4.546 2.914.5.5 0 0 1 .908-.417A6 6 0 1 1 8 2v1z" />
<path d="M8 4.466V.534a.25.25 0 0 1 .41-.192l2.36 1.966c.12.1.12.284 0 .384L8.41 4.658A.25.25 0 0 1 8 4.466z" />
</svg>
6 changes: 5 additions & 1 deletion engine/config/routes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
GoodJob::Engine.routes.draw do
root to: 'executions#index'
resources :cron_schedules, only: %i[index]
resources :jobs, only: %i[index show]
resources :jobs, only: %i[index show] do
member do
put :retry
end
end
resources :executions, only: %i[destroy]

scope controller: :assets do
Expand Down
8 changes: 8 additions & 0 deletions lib/good_job/current_thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,13 @@ def self.process_id
def self.thread_name
(Thread.current.name || Thread.current.object_id).to_s
end

# @return [void]
def self.within
reset
yield(self)
ensure
reset
end
end
end
25 changes: 18 additions & 7 deletions lib/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class Execution < Object.const_get(GoodJob.active_record_parent_class)
# Raised if something attempts to execute a previously completed Execution again.
PreviouslyPerformedError = Class.new(StandardError)

# String separating Error Class from Error Message
ERROR_MESSAGE_SEPARATOR = ": "

# ActiveJob jobs without a +queue_name+ attribute are placed on this queue.
DEFAULT_QUEUE_NAME = 'default'
# ActiveJob jobs without a +priority+ attribute are given this priority.
Expand Down Expand Up @@ -228,7 +231,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false

if CurrentThread.cron_key
execution_args[:cron_key] = CurrentThread.cron_key
elsif CurrentThread.active_job_id == active_job.job_id
elsif CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
execution_args[:cron_key] = CurrentThread.execution.cron_key
end

Expand All @@ -239,7 +242,7 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false
execution.save!
active_job.provider_job_id = execution.id

CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.execution && CurrentThread.execution.active_job_id == active_job.job_id
CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id

execution
end
Expand All @@ -259,7 +262,7 @@ def perform
result = execute

job_error = result.handled_error || result.unhandled_error
self.error = "#{job_error.class}: #{job_error.message}" if job_error
self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error

if result.unhandled_error && GoodJob.retry_on_unhandled_error
save!
Expand All @@ -279,19 +282,27 @@ def executable?
self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
end

def active_job
ActiveJob::Base.deserialize(active_job_data)
end

private

def active_job_data
serialized_params.deep_dup
.tap do |job_data|
job_data["provider_job_id"] = id
end
end

# @return [ExecutionResult]
def execute
GoodJob::CurrentThread.reset
GoodJob::CurrentThread.execution = self

job_data = serialized_params.deep_dup
job_data["provider_job_id"] = id

# DEPRECATION: Remove deprecated `good_job:` parameter in GoodJob v3
ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, execution: self, process_id: GoodJob::CurrentThread.process_id, thread_name: GoodJob::CurrentThread.thread_name }) do
value = ActiveJob::Base.execute(job_data)
value = ActiveJob::Base.execute(active_job_data)

if value.is_a?(Exception)
handled_error = value
Expand Down
68 changes: 68 additions & 0 deletions spec/engine/models/good_job/active_job_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,30 @@
RSpec.describe GoodJob::ActiveJobJob do
subject(:job) { described_class.find(head_execution.active_job_id) }

before do
allow(GoodJob).to receive(:preserve_job_records).and_return(true)
ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external)

stub_const 'TestJob', (Class.new(ActiveJob::Base) do
def perform
end
end)
stub_const 'TestJob::Error', Class.new(StandardError)
end

let!(:tail_execution) do
active_job_id = SecureRandom.uuid
GoodJob::Execution.create!(
active_job_id: SecureRandom.uuid,
created_at: 1.minute.ago,
queue_name: 'mice',
priority: 10,
serialized_params: {
'job_id' => active_job_id,
'job_class' => 'TestJob',
'executions' => 0,
'queue_name' => 'mice',
'priority' => 10,
}
)
end
Expand All @@ -20,9 +36,14 @@
GoodJob::Execution.create!(
active_job_id: tail_execution.active_job_id,
queue_name: 'mice',
priority: 10,
serialized_params: {
'job_id' => tail_execution.active_job_id,
'job_class' => 'TestJob',
'executions' => 1,
'exception_executions' => { 'TestJob::Error' => 1 },
'queue_name' => 'mice',
'priority' => 10,
}
).tap do |execution|
tail_execution.update!(
Expand Down Expand Up @@ -96,4 +117,51 @@
end
end
end

describe '#retry_job' do
context 'when job is discarded' do
before do
head_execution.update!(
finished_at: Time.current,
error: "TestJob::Error: TestJob::Error"
)
end

it 'enqueues another execution and updates the original job' do
original_head_execution = job.head_execution

expect do
job.retry_job
end.to change { job.executions.reload.size }.by(1)

new_head_execution = job.head_execution(reload: true)
expect(new_head_execution.serialized_params).to include(
"executions" => 2,
"queue_name" => "mice",
"priority" => 10
)

original_head_execution.reload
expect(original_head_execution.retried_good_job_id).to eq new_head_execution.id
end
end

context 'when job is already locked' do
it 'raises an Error' do
ActiveRecord::Base.clear_active_connections!
job.with_advisory_lock do
expect do
Concurrent::Promises.future(job, &:retry_job).value!
end.to raise_error GoodJob::Lockable::RecordAlreadyAdvisoryLockedError
end
end
end
end

context 'when job is not discarded' do
it 'raises an ActionForStateMismatchError' do
expect(job.reload.status).not_to eq :discarded
expect { job.retry_job }.to raise_error GoodJob::ActiveJobJob::ActionForStateMismatchError
end
end
end
Loading

0 comments on commit 16ca0db

Please sign in to comment.