Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create "discrete" good_job_executions table to separate Job records from Execution records and have a 1-to-1 correspondence between good_jobs records and Active Job jobs #928

Merged
merged 9 commits into from
Apr 22, 2023
15 changes: 14 additions & 1 deletion app/models/good_job/base_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,25 @@ def params_execution_count
def coalesce_scheduled_at_created_at
arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at'])
end

def discrete_support?
if connection.table_exists?('good_job_executions')
true
else
migration_pending_warning!
false
end
end
end

# The ActiveJob job class, as a string
# @return [String]
def job_class
serialized_params['job_class']
discrete? ? attributes['job_class'] : serialized_params['job_class']
end

def discrete?
self.class.discrete_support? && is_discrete?
end
end
end
52 changes: 52 additions & 0 deletions app/models/good_job/discrete_execution.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# frozen_string_literal: true

module GoodJob # :nodoc:
class DiscreteExecution < BaseRecord
self.table_name = 'good_job_executions'

belongs_to :execution, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true
belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true

scope :finished, -> { where.not(finished_at: nil) }

alias_attribute :performed_at, :created_at

def number
serialized_params.fetch('executions', 0) + 1
end

# Time between when this job was expected to run and when it started running
def queue_latency
created_at - scheduled_at
end

# Time between when this job started and finished
def runtime_latency
(finished_at || Time.current) - performed_at if performed_at
end

def last_status_at
finished_at || created_at
end

def status
if finished_at.present?
if error.present?
:retried
elsif error.present? && job.finished_at.present?
:discarded
else
:succeeded
end
else
:running
end
end

def display_serialized_params
serialized_params.merge({
_good_job_execution: attributes.except('serialized_params'),
})
end
end
end
133 changes: 118 additions & 15 deletions app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,13 @@ def self.queue_parser(string)
end

belongs_to :batch, class_name: 'GoodJob::BatchRecord', optional: true, inverse_of: :executions

belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions
after_destroy -> { self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job }
has_many :discrete_executions, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :execution # rubocop:disable Rails/HasManyOrHasOneDependent

after_destroy lambda {
GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4
self.class.active_job_id(active_job_id).delete_all
}, if: -> { @_destroy_job }

# Get executions with given ActiveJob ID
# @!method active_job_id
Expand Down Expand Up @@ -201,8 +205,12 @@ def self.queue_parser(string)
end
end)

# Construct a GoodJob::Execution from an ActiveJob instance.
def self.build_for_enqueue(active_job, overrides = {})
new(**enqueue_args(active_job, overrides))
end

# Construct arguments for GoodJob::Execution from an ActiveJob instance.
def self.enqueue_args(active_job, overrides = {})
if active_job.priority && GoodJob.configuration.smaller_number_is_higher_priority.nil?
ActiveSupport::Deprecation.warn(<<~DEPRECATION)
The next major version of GoodJob (v4.0) will change job `priority` to give smaller numbers higher priority (default: `0`), in accordance with Active Job's definition of priority.
Expand All @@ -218,6 +226,7 @@ def self.build_for_enqueue(active_job, overrides = {})
serialized_params: active_job.serialize,
scheduled_at: active_job.scheduled_at,
}

execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)

reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id
Expand All @@ -238,7 +247,7 @@ def self.build_for_enqueue(active_job, overrides = {})
execution_args[:cron_at] = CurrentThread.cron_at
end

new(**execution_args.merge(overrides))
execution_args.merge(overrides)
end

# Finds the next eligible Execution, acquire an advisory lock related to it, and
Expand Down Expand Up @@ -298,19 +307,47 @@ def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
# The new {Execution} instance representing the queued ActiveJob job.
def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
current_execution = CurrentThread.execution

retried = current_execution && current_execution.active_job_id == active_job.job_id
if retried
if current_execution.discrete?
execution = current_execution
execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at }))
execution.scheduled_at ||= Time.current
execution.performed_at = nil
execution.finished_at = nil
else
execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
end
else
execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at })
execution.make_discrete if discrete_support?
end

execution.create_with_advisory_lock = create_with_advisory_lock
instrument_payload[:execution] = execution
if create_with_advisory_lock
if execution.persisted?
execution.advisory_lock
else
execution.create_with_advisory_lock = true
end
end

instrument_payload[:execution] = execution
execution.save!
active_job.provider_job_id = execution.id
CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id

CurrentThread.execution.retried_good_job_id = execution.id if retried && !CurrentThread.execution.discrete?
active_job.provider_job_id = execution.id
execution
end
end

def self.format_error(error)
raise ArgumentError unless error.is_a?(Exception)

[error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join
end

# Execute the ActiveJob job this {Execution} represents.
# @return [ExecutionResult]
# An array of the return value of the job's +#perform+ method and the
Expand All @@ -320,12 +357,39 @@ def perform
run_callbacks(:perform) do
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

discrete_execution = nil
result = GoodJob::CurrentThread.within do |current_thread|
current_thread.reset
current_thread.execution = self

current_thread.execution_interrupted = performed_at if performed_at
update!(performed_at: Time.current)
if performed_at
current_thread.execution_interrupted = performed_at

if discrete?
interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{performed_at}'"))
self.error = interrupt_error_string
discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all( # rubocop:disable Rails/SkipsModelValidations
error: interrupt_error_string,
finished_at: Time.current
)
end
end

if discrete?
transaction do
now = Time.current
discrete_execution = discrete_executions.create!(
job_class: job_class,
queue_name: queue_name,
serialized_params: serialized_params,
scheduled_at: (scheduled_at || created_at),
created_at: now
)
update!(performed_at: now, executions_count: ((executions_count || 0) + 1))
end
else
update!(performed_at: Time.current)
end

ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
value = ActiveJob::Base.execute(active_job_data)
Expand All @@ -349,14 +413,42 @@ def perform
end

job_error = result.handled_error || result.unhandled_error
self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error

if job_error
error_string = self.class.format_error(job_error)
self.error = error_string
discrete_execution.error = error_string if discrete_execution
else
self.error = nil
end

reenqueued = result.retried? || retried_good_job_id.present?
if result.unhandled_error && GoodJob.retry_on_unhandled_error
save!
if discrete_execution
transaction do
discrete_execution.update!(finished_at: Time.current)
update!(performed_at: nil, finished_at: nil, retried_good_job_id: nil)
end
else
save!
end
elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present?
self.finished_at = Time.current
save!
now = Time.current
if discrete_execution
if reenqueued
self.performed_at = nil
else
self.finished_at = now
end
discrete_execution.finished_at = now
transaction do
discrete_execution.save!
save!
end
else
self.finished_at = now
save!
end
else
destroy_job
end
Expand All @@ -371,6 +463,17 @@ def executable?
self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
end

def make_discrete
self.is_discrete = true
self.id = active_job_id
self.job_class = serialized_params['job_class']
self.executions_count ||= 0

current_time = Time.current
self.created_at ||= current_time
self.scheduled_at ||= current_time
end

# Build an ActiveJob instance and deserialize the arguments, using `#active_job_data`.
#
# @param ignore_deserialization_errors [Boolean]
Expand Down
12 changes: 10 additions & 2 deletions app/models/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ def table_name=(_value)

belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true
has_many :executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent
has_many :discrete_executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: :active_job_id, inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent

after_destroy lambda {
GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4
}

# Only the most-recent unretried execution represents a "Job"
default_scope { where(retried_good_job_id: nil) }
Expand All @@ -56,6 +61,8 @@ def table_name=(_value)
# Errored but will not be retried
scope :discarded, -> { finished.where.not(error: nil) }

scope :unfinished_undiscrete, -> { where(finished_at: nil, retried_good_job_id: nil, is_discrete: [nil, false]) }

# The job's ActiveJob UUID
# @return [String]
def id
Expand Down Expand Up @@ -191,9 +198,10 @@ def retry_job

execution.class.transaction(joinable: false, requires_new: true) do
new_active_job = active_job.retry_job(wait: 0, error: execution.error)
execution.save
execution.save!
end
end

new_active_job
end
end
Expand All @@ -213,7 +221,7 @@ def discard_job(message)
update_execution = proc do
execution.update(
finished_at: Time.current,
error: [job_error.class, GoodJob::Execution::ERROR_MESSAGE_SEPARATOR, job_error.message].join
error: GoodJob::Execution.format_error(job_error)
)
end

Expand Down
34 changes: 30 additions & 4 deletions app/views/good_job/jobs/show.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
<nav aria-label="breadcrumb">
<ol class="breadcrumb small mb-0">
<li class="breadcrumb-item"><%= link_to t(".jobs"), jobs_path %></li>
<li class="breadcrumb-item active" aria-current="page"><%= tag.code @job.id, class: "text-muted" %></li>
<li class="breadcrumb-item active" aria-current="page">
<%= tag.code @job.id, class: "text-muted" %>
<% if @job.discrete? %>
<span class="badge bg-info text-dark">Discrete</span>
<% end %>
</li>
</ol>
</nav>
<div class="row align-items-center">
Expand All @@ -21,6 +26,10 @@
<div class="font-monospace fw-bold small my-2"><%= tag.strong @job.priority %></div>
</div>
<div class="col text-end">
<div class="mb-2">
<%= tag.span relative_time(@job.last_status_at), class: "small" %>
<%= status_badge @job.status %>
</div>
<% if @job.status.in? [:scheduled, :retried, :queued] %>
<%= button_to reschedule_job_path(@job.id), method: :put,
class: "btn btn-sm btn-outline-primary",
Expand Down Expand Up @@ -59,8 +68,25 @@
</div>

<div class="my-4">
<h5><%= t "good_job.models.job.arguments" %></h5>
<%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %>
<h5>
<%= t "good_job.models.job.arguments" %>
<%= tag.button type: "button", class: "btn btn-sm text-muted", role: "button",
title: t("good_job.actions.inspect"),
data: { bs_toggle: "collapse", bs_target: "##{dom_id(@job, 'params')}" },
aria: { expanded: false, controls: dom_id(@job, "params") } do %>
<%= render_icon "info" %>
<span class="visually-hidden"><%= t "good_job.actions.inspect" %></span>
<% end %>
</h5>
</div>
<%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %>

<%= tag.div id: dom_id(@job, "params"), class: "list-group-item collapse small bg-dark text-light" do %>
<%= tag.pre JSON.pretty_generate(@job.display_serialized_params) %>
<% end %>

<%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %>
<% if @job.discrete? %>
<%= render 'executions', executions: @job.discrete_executions.reverse %>
<% else %>
<%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %>
<% end %>
Loading