Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't delegate GoodJob::Job#status to executions to avoid race condition #661

Merged
merged 1 commit into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/controllers/good_job/jobs_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def mass_update
end

def show
@job = Job.find(params[:id])
@job = Job.includes_advisory_locks.find(params[:id])
end

def discard
Expand Down
2 changes: 1 addition & 1 deletion app/views/good_job/jobs/show.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@
<%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', ') %>
</div>

<%= render 'executions', executions: @job.executions.reverse %>
<%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %>
12 changes: 9 additions & 3 deletions lib/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ def active_job
# @return [Symbol]
def status
if finished_at.present?
if error.present?
if error.present? && retried_good_job_id.present?
:retried
elsif error.present? && retried_good_job_id.nil?
:discarded
else
:finished
Expand All @@ -298,7 +300,11 @@ def status
end

def running?
performed_at? && !finished_at?
if has_attribute?(:locktype)
self['locktype'].present?
else
advisory_locked?
end
end

def number
Expand All @@ -314,7 +320,7 @@ def last_status_at
def queue_latency
now = Time.zone.now
expected_start = scheduled_at || created_at
actual_start = performed_at || now
actual_start = performed_at || finished_at || now

actual_start - expected_start unless expected_start >= now
end
Expand Down
56 changes: 49 additions & 7 deletions lib/models/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module GoodJob
# ActiveRecord model that represents an +ActiveJob+ job.
# There is not a table in the database whose discrete rows represents "Jobs".
# The +good_jobs+ table is a table of individual {GoodJob::Execution}s that share the same +active_job_id+.
# A single row from the +good_jobs+ table of executions is fetched to represent an Job
# A single row from the +good_jobs+ table of executions is fetched to represent a Job.
#
class Job < BaseRecord
include Filterable
include Lockable
Expand Down Expand Up @@ -72,9 +73,51 @@ def job_class
serialized_params['job_class']
end

# The status of the Job, based on the state of its most recent execution.
# @return [Symbol]
delegate :status, :last_status_at, to: :head_execution
def last_status_at
finished_at || performed_at || scheduled_at || created_at
end

def status
if finished_at.present?
if error.present? && retried_good_job_id.present?
:retried
elsif error.present? && retried_good_job_id.nil?
:discarded
else
:finished
end
elsif (scheduled_at || created_at) > DateTime.current
if serialized_params.fetch('executions', 0) > 1
:retried
else
:scheduled
end
elsif running?
:running
else
:queued
end
end

# Override #reload to add a custom scope to ensure the reloaded record is the head execution
# @return [Job]
def reload(options = nil)
self.class.connection.clear_query_cache

# override with the `where(retried_good_job_id: nil)` scope
override_query = self.class.where(retried_good_job_id: nil)
fresh_object =
if options && options[:lock]
self.class.unscoped { override_query.lock(options[:lock]).find(id) }
else
self.class.unscoped { override_query.find(id) }
end

@attributes = fresh_object.instance_variable_get(:@attributes)
@new_record = false
@previously_new_record = false
self
end

# This job's most recent {Execution}
# @param reload [Booelan] whether to reload executions
Expand All @@ -94,7 +137,7 @@ def tail_execution
# The number of times this job has been executed, according to ActiveJob's serialized state.
# @return [Numeric]
def executions_count
aj_count = head_execution.serialized_params.fetch('executions', 0)
aj_count = serialized_params.fetch('executions', 0)
# The execution count within serialized_params is not updated
# once the underlying execution has been executed.
if status.in? [:discarded, :finished, :running]
Expand All @@ -114,7 +157,7 @@ def preserved_executions_count
# If the job has been retried, the error will be fetched from the previous {Execution} record.
# @return [String]
def recent_error
head_execution.error || executions[-2]&.error
error || executions[-2]&.error
end

# Tests whether the job is being executed right now.
Expand Down Expand Up @@ -192,7 +235,6 @@ def reschedule_job(scheduled_at = Time.current)

raise ActionForStateMismatchError if execution.finished_at.present?

execution = head_execution(reload: true)
execution.update(scheduled_at: scheduled_at)
end
end
Expand Down
2 changes: 1 addition & 1 deletion spec/lib/models/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def perform(feline = nil, canine: nil)
end

describe '#retry_job' do
context 'when job is discarded' do
context 'when job is retried' do
before do
head_execution.update!(
finished_at: Time.current,
Expand Down