From 37962daa12c7a9515f0cfd201fc56eacbe4f159f Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Thu, 13 Apr 2023 08:01:08 -0700 Subject: [PATCH 1/8] Create "discrete" `good_job_executions` table to separate Job records from Execution records --- app/models/good_job/base_execution.rb | 13 ++ app/models/good_job/discrete_execution.rb | 63 ++++++++ app/models/good_job/execution.rb | 111 +++++++++++-- app/models/good_job/job.rb | 6 +- app/views/good_job/jobs/show.html.erb | 13 +- .../migrations/create_good_jobs.rb.erb | 15 ++ .../05_create_good_job_executions.rb.erb | 29 ++++ lib/good_job.rb | 30 ++-- spec/app/jobs/example_job_spec.rb | 23 ++- spec/app/models/good_job/execution_spec.rb | 62 ++++++++ spec/app/models/good_job/job_spec.rb | 148 +++++++++++++----- spec/integration/batch_spec.rb | 14 +- .../active_job_extensions/concurrency_spec.rb | 48 ++++-- .../interrupt_errors_spec.rb | 56 ++++++- .../notify_options_spec.rb | 2 +- spec/lib/good_job_spec.rb | 15 +- .../requests/good_job/jobs_controller_spec.rb | 13 +- spec/support/example_app_helper.rb | 1 + spec/support/rspec_not_change.rb | 2 + spec/system/jobs_spec.rb | 2 +- spec/test_app/app/jobs/example_job.rb | 2 +- ...230412144442_create_good_job_executions.rb | 29 ++++ spec/test_app/db/schema.rb | 28 +++- 23 files changed, 599 insertions(+), 126 deletions(-) create mode 100644 app/models/good_job/discrete_execution.rb create mode 100644 lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb create mode 100644 spec/support/rspec_not_change.rb create mode 100644 spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb diff --git a/app/models/good_job/base_execution.rb b/app/models/good_job/base_execution.rb index 0cc113c37..03dcfa856 100644 --- a/app/models/good_job/base_execution.rb +++ b/app/models/good_job/base_execution.rb @@ -33,6 +33,15 @@ def params_execution_count def coalesce_scheduled_at_created_at arel_table.coalesce(arel_table['scheduled_at'], arel_table['created_at']) end + + def discrete_support? + if connection.table_exists?('good_job_executions') + true + else + migration_pending_warning! + false + end + end end # The ActiveJob job class, as a string @@ -40,5 +49,9 @@ def coalesce_scheduled_at_created_at def job_class serialized_params['job_class'] end + + def discrete? + self.class.discrete_support? && is_discrete? + end end end diff --git a/app/models/good_job/discrete_execution.rb b/app/models/good_job/discrete_execution.rb new file mode 100644 index 000000000..5264d040c --- /dev/null +++ b/app/models/good_job/discrete_execution.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +module GoodJob # :nodoc: + class DiscreteExecution < BaseRecord + self.table_name = 'good_job_executions' + + belongs_to :execution, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true + belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :discrete_executions, optional: true + + scope :finished, -> { where.not(finished_at: nil) } + + alias_attribute :performed_at, :created_at + + def number + serialized_params.fetch('executions', 0) + 1 + end + + # Time between when this job was expected to run and when it started running + def queue_latency + now = Time.zone.now + actual_start = performed_at || finished_at || now + + actual_start - perform_expected_at unless perform_expected_at >= now + end + + # Time between when this job started and finished + def runtime_latency + (finished_at || Time.zone.now) - performed_at if performed_at + end + + def last_status_at + finished_at || performed_at || perform_expected_at || created_at + end + + def status + if finished_at.present? + if error.present? + :retried + elsif error.present? && job.finished_at.present? + :discarded + else + :succeeded + end + elsif (perform_expected_at || created_at) > DateTime.current + if serialized_params.fetch('executions', 0) > 1 + :retried + else + :scheduled + end + elsif performed_at.present? + :running + else + :queued + end + end + + def display_serialized_params + serialized_params.merge({ + _good_job_execution: attributes.except('serialized_params'), + }) + end + end +end diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index b845aeabf..8726aa2dd 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -70,9 +70,13 @@ def self.queue_parser(string) end belongs_to :batch, class_name: 'GoodJob::BatchRecord', optional: true, inverse_of: :executions - belongs_to :job, class_name: 'GoodJob::Job', foreign_key: 'active_job_id', primary_key: 'active_job_id', optional: true, inverse_of: :executions - after_destroy -> { self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job } + has_many :discrete_executions, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :execution # rubocop:disable Rails/HasManyOrHasOneDependent + + after_destroy lambda { + discrete_executions.delete_all if discrete? + self.class.active_job_id(active_job_id).delete_all + }, if: -> { @_destroy_job } # Get executions with given ActiveJob ID # @!method active_job_id @@ -201,8 +205,12 @@ def self.queue_parser(string) end end) - # Construct a GoodJob::Execution from an ActiveJob instance. def self.build_for_enqueue(active_job, overrides = {}) + new(**enqueue_args(active_job, overrides)) + end + + # Construct arguments for GoodJob::Execution from an ActiveJob instance. + def self.enqueue_args(active_job, overrides = {}) if active_job.priority && GoodJob.configuration.smaller_number_is_higher_priority.nil? ActiveSupport::Deprecation.warn(<<~DEPRECATION) The next major version of GoodJob (v4.0) will change job `priority` to give smaller numbers higher priority (default: `0`), in accordance with Active Job's definition of priority. @@ -218,6 +226,10 @@ def self.build_for_enqueue(active_job, overrides = {}) serialized_params: active_job.serialize, scheduled_at: active_job.scheduled_at, } + if discrete_support? + execution_args[:is_discrete] = true + execution_args[:executions_count] = 0 + end execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id @@ -238,7 +250,7 @@ def self.build_for_enqueue(active_job, overrides = {}) execution_args[:cron_at] = CurrentThread.cron_at end - new(**execution_args.merge(overrides)) + execution_args.merge(overrides) end # Finds the next eligible Execution, acquire an advisory lock related to it, and @@ -298,19 +310,36 @@ def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) # The new {Execution} instance representing the queued ActiveJob job. def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| - execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) + current_execution = CurrentThread.execution + + retried = current_execution && current_execution.active_job_id == active_job.job_id + if retried && current_execution.discrete? + current_execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at })) + current_execution.finished_at = nil + current_execution.advisory_lock if create_with_advisory_lock + execution = current_execution + else + execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) + execution.is_discrete = nil if retried && discrete_support? && !current_execution.is_discrete? + execution.create_with_advisory_lock = create_with_advisory_lock - execution.create_with_advisory_lock = create_with_advisory_lock - instrument_payload[:execution] = execution + instrument_payload[:execution] = execution - execution.save! - active_job.provider_job_id = execution.id - CurrentThread.execution.retried_good_job_id = execution.id if CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id + execution.save! + CurrentThread.execution.retried_good_job_id = execution.id if retried + end + active_job.provider_job_id = execution.id execution end end + def self.format_error(error) + raise ArgumentError unless error.is_a?(Exception) + + [error.class.to_s, ERROR_MESSAGE_SEPARATOR, error.message].join + end + # Execute the ActiveJob job this {Execution} represents. # @return [ExecutionResult] # An array of the return value of the job's +#perform+ method and the @@ -320,12 +349,33 @@ def perform run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at + discrete_execution = nil result = GoodJob::CurrentThread.within do |current_thread| current_thread.reset current_thread.execution = self - current_thread.execution_interrupted = performed_at if performed_at - update!(performed_at: Time.current) + if performed_at + current_thread.execution_interrupted = performed_at + + if discrete? + interrupt_error_string = self.class.format_error(GoodJob::InterruptError.new("Interrupted after starting perform at '#{performed_at}'")) + self.error = interrupt_error_string + discrete_executions.where(finished_at: nil).where.not(performed_at: nil).update_all( # rubocop:disable Rails/SkipsModelValidations + error: interrupt_error_string, + finished_at: Time.current + ) + end + end + + if discrete? + transaction do + now = Time.current + discrete_execution = discrete_executions.create!(serialized_params: serialized_params, perform_expected_at: scheduled_at || created_at, created_at: now) + update!(performed_at: now, executions_count: (executions_count || 0) + 1) + end + else + update!(performed_at: Time.current) + end ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| value = ActiveJob::Base.execute(active_job_data) @@ -349,14 +399,43 @@ def perform end job_error = result.handled_error || result.unhandled_error - self.error = [job_error.class, ERROR_MESSAGE_SEPARATOR, job_error.message].join if job_error + + if job_error + error_string = self.class.format_error(job_error) + self.error = error_string + discrete_execution.error = error_string if discrete_execution + else + self.error = nil + end reenqueued = result.retried? || retried_good_job_id.present? if result.unhandled_error && GoodJob.retry_on_unhandled_error - save! + if discrete_execution + transaction do + discrete_execution.update!(finished_at: Time.current) + self.performed_at = nil # TODO: will this break something to unset this? + save! + end + else + save! + end elsif GoodJob.preserve_job_records == true || reenqueued || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) || cron_key.present? - self.finished_at = Time.current - save! + now = Time.current + if discrete_execution + if reenqueued + self.performed_at = nil + else + self.finished_at = now + end + discrete_execution.finished_at = now + transaction do + discrete_execution.save! + save! + end + else + self.finished_at = now + save! + end else destroy_job end diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb index 0761db07a..e490e13e7 100644 --- a/app/models/good_job/job.rb +++ b/app/models/good_job/job.rb @@ -30,6 +30,7 @@ def table_name=(_value) belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true has_many :executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent + has_many :discrete_executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', inverse_of: :job, dependent: :delete_all # Only the most-recent unretried execution represents a "Job" default_scope { where(retried_good_job_id: nil) } @@ -191,9 +192,10 @@ def retry_job execution.class.transaction(joinable: false, requires_new: true) do new_active_job = active_job.retry_job(wait: 0, error: execution.error) - execution.save + execution.save! end end + new_active_job end end @@ -213,7 +215,7 @@ def discard_job(message) update_execution = proc do execution.update( finished_at: Time.current, - error: [job_error.class, GoodJob::Execution::ERROR_MESSAGE_SEPARATOR, job_error.message].join + error: GoodJob::Execution.format_error(job_error) ) end diff --git a/app/views/good_job/jobs/show.html.erb b/app/views/good_job/jobs/show.html.erb index 01e20e306..bd361b22a 100644 --- a/app/views/good_job/jobs/show.html.erb +++ b/app/views/good_job/jobs/show.html.erb @@ -3,7 +3,12 @@
@@ -63,4 +68,8 @@ <%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %>
-<%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %> +<% if @job.discrete? %> + <%= render 'executions', executions: @job.discrete_executions.reverse %> +<% else %> + <%= render 'executions', executions: @job.executions.includes_advisory_locks.reverse %> +<% end %> diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index 73cc1dba3..ad0004063 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -22,6 +22,9 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.uuid :batch_id t.uuid :batch_callback_id + + t.boolean :is_discrete + t.integer :executions_count end create_table :good_job_batches, id: :uuid do |t| @@ -38,6 +41,16 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.datetime :finished_at end + create_table :good_job_executions, id: :uuid do |t| + t.timestamps + + t.uuid :active_job_id + t.jsonb :serialized_params + t.datetime :perform_expected_at + t.datetime :finished_at + t.text :error + end + create_table :good_job_processes, id: :uuid do |t| t.timestamps t.jsonb :state @@ -62,5 +75,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> where: "finished_at IS NULL", name: :index_good_jobs_jobs_on_priority_created_at_when_unfinished add_index :good_jobs, [:batch_id], where: "batch_id IS NOT NULL" add_index :good_jobs, [:batch_callback_id], where: "batch_callback_id IS NOT NULL" + + add_index :good_job_executions, [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at end end diff --git a/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb new file mode 100644 index 000000000..e4f3a48d8 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb @@ -0,0 +1,29 @@ +# frozen_string_literal: true +class CreateGoodJobExecutions < ActiveRecord::Migration<%= migration_version %> + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.table_exists?(:good_job_executions) + end + end + + create_table :good_job_executions, id: :uuid do |t| + t.timestamps + + t.uuid :active_job_id + t.jsonb :serialized_params + t.datetime :perform_expected_at + t.datetime :finished_at + t.text :error + + t.index [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at + end + + change_table :good_jobs do |t| + t.boolean :is_discrete + t.integer :executions_count + end + end +end diff --git a/lib/good_job.rb b/lib/good_job.rb index 1995888c3..4f0f26ca4 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -170,19 +170,31 @@ def self.cleanup_preserved_jobs(older_than: nil) ActiveSupport::Notifications.instrument("cleanup_preserved_jobs.good_job", { older_than: older_than, timestamp: timestamp }) do |payload| old_jobs = GoodJob::Job.where('finished_at <= ?', timestamp) old_jobs = old_jobs.succeeded unless include_discarded + + deleted_discrete_executions_count = if GoodJob::DiscreteExecution.migrated? + GoodJob::DiscreteExecution.where(job: old_jobs).delete_all + else + 0 + end + deleted_executions_count = GoodJob::Execution.where(job: old_jobs).delete_all - if GoodJob::BatchRecord.migrated? - old_batches = GoodJob::BatchRecord.where('finished_at <= ?', timestamp) - old_batches = old_batches.succeeded unless include_discarded - deleted_batches_count = old_batches.delete_all - else - deleted_batches_count = 0 - end + deleted_batches_count = if GoodJob::BatchRecord.migrated? + old_batches = GoodJob::BatchRecord.where('finished_at <= ?', timestamp) + old_batches = old_batches.succeeded unless include_discarded + old_batches.delete_all + else + 0 + end - payload[:destroyed_executions_count] = deleted_executions_count payload[:destroyed_batches_count] = deleted_batches_count - payload[:destroyed_records_count] = deleted_executions_count + deleted_batches_count + payload[:destroyed_discrete_executions_count] = deleted_discrete_executions_count + payload[:destroyed_executions_count] = deleted_executions_count + + destroyed_records_count = deleted_batches_count + deleted_discrete_executions_count + deleted_executions_count + payload[:destroyed_records_count] = destroyed_records_count + + destroyed_records_count end end diff --git a/spec/app/jobs/example_job_spec.rb b/spec/app/jobs/example_job_spec.rb index e0dea7faf..327b531e4 100644 --- a/spec/app/jobs/example_job_spec.rb +++ b/spec/app/jobs/example_job_spec.rb @@ -25,9 +25,9 @@ end travel_back - executions = GoodJob::Execution.where(active_job_id: active_job.job_id).order(created_at: :asc) - expect(executions.size).to eq 2 - expect(executions.last.error).to be_nil + good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) + expect(good_job.discrete_executions.count).to eq 2 + expect(good_job.discrete_executions.last.error).to be_nil end end @@ -40,26 +40,25 @@ end travel_back - executions = GoodJob::Execution.where(active_job_id: active_job.job_id).order(created_at: :asc) - expect(executions.size).to eq 6 - expect(executions.last.error).to be_nil + good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) + + expect(good_job.discrete_executions.count).to eq 6 + expect(good_job.discrete_executions.last.error).to be_nil end end describe "DEAD_TYPE" do it 'errors but does not retry' do - described_class.perform_later(described_class::DEAD_TYPE) + active_job = described_class.perform_later(described_class::DEAD_TYPE) 10.times do GoodJob.perform_inline travel(5.minutes) end travel_back - active_job_id = GoodJob::Execution.last.active_job_id - - executions = GoodJob::Execution.where(active_job_id: active_job_id).order(created_at: :asc) - expect(executions.size).to eq 3 - expect(executions.last.error).to be_present + good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) + expect(good_job.discrete_executions.count).to eq 3 + expect(good_job.discrete_executions.last.error).to be_present end end diff --git a/spec/app/models/good_job/execution_spec.rb b/spec/app/models/good_job/execution_spec.rb index 258b4544f..dd2b10f3f 100644 --- a/spec/app/models/good_job/execution_spec.rb +++ b/spec/app/models/good_job/execution_spec.rb @@ -3,6 +3,8 @@ RSpec.describe GoodJob::Execution do before do + allow(described_class).to receive(:discrete_support?).and_return(false) + stub_const "RUN_JOBS", Concurrent::Array.new stub_const 'TestJob', (Class.new(ActiveJob::Base) do self.queue_name = 'test' @@ -621,6 +623,66 @@ def job_params end end end + + context 'when Discrete' do + before do + ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :inline) + allow(described_class).to receive(:discrete_support?).and_return(true) + good_job.update!(is_discrete: true) + end + + it 'creates a DiscreteExecution record' do + good_job.perform + + dexecution = good_job.discrete_executions.first + expect(dexecution).to be_present + expect(dexecution).to have_attributes( + active_job_id: good_job.active_job_id, + created_at: good_job.performed_at, + perform_expected_at: good_job.scheduled_at || good_job.created_at, + finished_at: within(1.second).of(Time.current), + error: nil, + serialized_params: good_job.serialized_params + ) + end + + context 'when ActiveJob rescues an error' do + let(:active_job) { TestJob.new("a string", raise_error: true) } + let!(:good_job) { described_class.enqueue(active_job) } + + before do + allow(described_class).to receive(:discrete_support?).and_return(true) + allow(GoodJob).to receive(:preserve_job_records).and_return(true) + TestJob.retry_on(StandardError, wait: 1.hour, attempts: 2) { nil } + good_job.update!(is_discrete: true) + end + + it 'updates the existing Execution/Job record instead of creating a new one' do + expect { good_job.perform } + .to not_change(described_class, :count) + .and change { good_job.reload.serialized_params["executions"] }.by(1) + .and not_change { good_job.reload.id } + .and not_change { described_class.count } + + expect(good_job).to have_attributes( + error: "TestJob::ExpectedError: Raised expected error", + created_at: within(1.second).of(Time.current), + performed_at: nil, + finished_at: nil, + scheduled_at: within(10.minutes).of(1.hour.from_now) # interval because of retry jitter + ) + expect(GoodJob::DiscreteExecution.count).to eq(1) + discrete_execution = good_job.discrete_executions.first + expect(discrete_execution).to have_attributes( + active_job_id: good_job.active_job_id, + error: "TestJob::ExpectedError: Raised expected error", + created_at: within(1.second).of(Time.current), + perform_expected_at: within(1.second).of(Time.current), + finished_at: within(1.second).of(Time.current) + ) + end + end + end end describe '#destroy_job' do diff --git a/spec/app/models/good_job/job_spec.rb b/spec/app/models/good_job/job_spec.rb index 18a47eb1d..ffc9e92a2 100644 --- a/spec/app/models/good_job/job_spec.rb +++ b/spec/app/models/good_job/job_spec.rb @@ -2,23 +2,38 @@ require 'rails_helper' RSpec.describe GoodJob::Job do - subject(:job) { described_class.find(head_execution.active_job_id) } + let(:active_job_id) { SecureRandom.uuid } - before do - allow(GoodJob).to receive(:preserve_job_records).and_return(true) - ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external) - - stub_const 'TestJob', (Class.new(ActiveJob::Base) do - def perform(feline = nil, canine: nil) - end - end) - stub_const 'TestJob::Error', Class.new(StandardError) + let(:job) do + described_class.create!( + is_discrete: true, + active_job_id: active_job_id, + scheduled_at: 10.minutes.from_now, + queue_name: 'mice', + priority: 10, + serialized_params: { + 'job_id' => active_job_id, + 'job_class' => 'TestJob', + 'executions' => 1, + 'exception_executions' => { 'TestJob::Error' => 1 }, + 'queue_name' => 'mice', + 'priority' => 10, + 'arguments' => ['cat', { 'canine' => 'dog' }], + } + ).tap do |job| + job.discrete_executions.create!( + perform_expected_at: 1.minute.ago, + created_at: 1.minute.ago, + finished_at: 1.minute.ago, + error: "TestJob::Error: TestJob::Error" + ) + end end + let(:undiscrete_job) { described_class.find(head_execution.active_job_id) } - let!(:tail_execution) do - active_job_id = SecureRandom.uuid + let(:tail_execution) do GoodJob::Execution.create!( - active_job_id: SecureRandom.uuid, + active_job_id: active_job_id, created_at: 1.minute.ago, queue_name: 'mice', priority: 10, @@ -33,7 +48,7 @@ def perform(feline = nil, canine: nil) ) end - let!(:head_execution) do + let(:head_execution) do GoodJob::Execution.create!( active_job_id: tail_execution.active_job_id, scheduled_at: 10.minutes.from_now, @@ -57,6 +72,18 @@ def perform(feline = nil, canine: nil) end end + before do + allow(GoodJob).to receive(:preserve_job_records).and_return(true) + ActiveJob::Base.queue_adapter = GoodJob::Adapter.new(execution_mode: :external) + + stub_const 'TestJob', (Class.new(ActiveJob::Base) do + def perform + raise "Didn't expect to perform this job" + end + end) + stub_const 'TestJob::Error', Class.new(StandardError) + end + describe '.find' do it 'returns a record that is the same as the head execution' do job = described_class.find(head_execution.active_job_id) @@ -66,7 +93,7 @@ def perform(feline = nil, canine: nil) describe '#id' do it 'is the ActiveJob ID' do - expect(job.id).to eq head_execution.active_job_id + expect(job.id).to eq job.active_job_id end end @@ -78,6 +105,7 @@ def perform(feline = nil, canine: nil) describe '#head_execution' do it 'is the head execution (which should be the same record)' do + job = undiscrete_job expect(job.head_execution).to eq head_execution expect(job._execution_id).to eq head_execution.id end @@ -85,6 +113,7 @@ def perform(feline = nil, canine: nil) describe '#tail_execution' do it 'is the tail execution' do + job = undiscrete_job expect(job.tail_execution).to eq tail_execution end end @@ -160,29 +189,50 @@ def perform(feline = nil, canine: nil) describe '#retry_job' do context 'when job is retried' do before do - head_execution.update!( + job.update!( finished_at: Time.current, error: "TestJob::Error: TestJob::Error" ) end - it 'enqueues another execution and updates the original job' do - original_head_execution = job.head_execution + it 'updates the original job' do + expect(job).to be_discrete expect do job.retry_job - end.to change { job.executions.reload.size }.by(1) - - new_head_execution = job.head_execution(reload: true) - expect(new_head_execution.serialized_params).to include( - "executions" => 2, - "queue_name" => "mice", - "priority" => 10, - "arguments" => ['cat', hash_including('canine' => 'dog')] - ) + end.to change { job.reload.finished? }.from(true).to(false) + expect(job.executions.count).to eq 1 + end + + context 'when job is not discrete' do + let(:job) { undiscrete_job } - original_head_execution.reload - expect(original_head_execution.retried_good_job_id).to eq new_head_execution.id + before do + head_execution.update!( + finished_at: Time.current, + error: "TestJob::Error: TestJob::Error" + ) + end + + it 'enqueues another execution and updates the original job' do + original_head_execution = undiscrete_job.head_execution + + expect do + job.retry_job + end.to change { job.executions.reload.size }.by(1) + expect(job.reload).not_to be_finished + + new_head_execution = job.head_execution(reload: true) + expect(new_head_execution.serialized_params).to include( + "executions" => 2, + "queue_name" => "mice", + "priority" => 10, + "arguments" => ['cat', hash_including('canine' => 'dog')] + ) + + original_head_execution.reload + expect(original_head_execution.retried_good_job_id).to eq new_head_execution.id + end end end @@ -279,18 +329,12 @@ def perform(feline = nil, canine: nil) end describe '#destroy_job' do - context 'when a job is finished' do - before do - job.head_execution.update! finished_at: Time.current - end - - it 'destroys all the job executions' do - job.destroy_job + it 'destroys job and executions' do + job.update! finished_at: Time.current + job.destroy_job - expect { head_execution.reload }.to raise_error ActiveRecord::RecordNotFound - expect { tail_execution.reload }.to raise_error ActiveRecord::RecordNotFound - expect { job.reload }.to raise_error ActiveRecord::RecordNotFound - end + expect { job.reload }.to raise_error ActiveRecord::RecordNotFound + expect(GoodJob::DiscreteExecution.count).to eq 0 end context 'when a job is not finished' do @@ -298,5 +342,29 @@ def perform(feline = nil, canine: nil) expect { job.destroy_job }.to raise_error GoodJob::Job::ActionForStateMismatchError end end + + context "when undiscrete job" do + let(:job) { undiscrete_job } + + context 'when a job is finished' do + before do + job.head_execution.update! finished_at: Time.current + end + + it 'destroys all the job executions' do + job.destroy_job + + expect { head_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { tail_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { job.reload }.to raise_error ActiveRecord::RecordNotFound + end + end + + context 'when a job is not finished' do + it 'raises an ActionForStateMismatchError' do + expect { job.destroy_job }.to raise_error GoodJob::Job::ActionForStateMismatchError + end + end + end end end diff --git a/spec/integration/batch_spec.rb b/spec/integration/batch_spec.rb index 9a00f042b..bb4396fc7 100644 --- a/spec/integration/batch_spec.rb +++ b/spec/integration/batch_spec.rb @@ -215,9 +215,10 @@ def perform(*_args, **_kwargs) GoodJob.perform_inline expect(GoodJob::Job.count).to eq 3 - expect(GoodJob::Execution.count).to eq 5 - expect(GoodJob::Execution.where(batch_id: batch.id).count).to eq 1 - expect(GoodJob::Execution.where(batch_callback_id: batch.id).count).to eq 4 + expect(GoodJob::Execution.count).to eq 3 + expect(GoodJob::DiscreteExecution.count).to eq 5 + expect(GoodJob::Job.where(batch_id: batch.id).count).to eq 1 + expect(GoodJob::Job.where(batch_callback_id: batch.id).count).to eq 2 callback_arguments = GoodJob::Job.where(batch_callback_id: batch.id).map(&:head_execution).map(&:active_job).map(&:arguments).map(&:second) expect(callback_arguments).to contain_exactly({ event: :discard }, { event: :finish }) @@ -230,9 +231,10 @@ def perform(*_args, **_kwargs) GoodJob.perform_inline expect(GoodJob::Job.count).to eq 3 - expect(GoodJob::Execution.count).to eq 5 - expect(GoodJob::Execution.where(batch_id: batch.id).count).to eq 1 - expect(GoodJob::Execution.where(batch_callback_id: batch.id).count).to eq 4 + expect(GoodJob::Execution.count).to eq 3 + expect(GoodJob::DiscreteExecution.count).to eq 5 + expect(GoodJob::Job.where(batch_id: batch.id).count).to eq 1 + expect(GoodJob::Job.where(batch_callback_id: batch.id).count).to eq 2 callback_arguments = GoodJob::Job.where(batch_callback_id: batch.id).map(&:head_execution).map(&:active_job).map(&:arguments).map(&:second) expect(callback_arguments).to contain_exactly({ event: :success }, { event: :finish }) diff --git a/spec/lib/good_job/active_job_extensions/concurrency_spec.rb b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb index 3526c7b27..f580bd147 100644 --- a/spec/lib/good_job/active_job_extensions/concurrency_spec.rb +++ b/spec/lib/good_job/active_job_extensions/concurrency_spec.rb @@ -113,19 +113,21 @@ def perform(name:) end it "will error and retry jobs if concurrency is exceeded" do - TestJob.perform_later(name: "Alice") + active_job = TestJob.perform_later(name: "Alice") performer = GoodJob::JobPerformer.new('*') scheduler = GoodJob::Scheduler.new(performer, max_threads: 5) 5.times { scheduler.create_thread } sleep_until(max: 10, increments_of: 0.5) do - GoodJob::Execution.where(concurrency_key: "Alice").finished.count >= 1 + GoodJob::DiscreteExecution.where(active_job_id: active_job.job_id).finished.count >= 1 end scheduler.shutdown - expect(GoodJob::Execution.count).to be >= 1 - expect(GoodJob::Execution.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present + expect(GoodJob::Job.find_by(active_job_id: active_job.job_id).concurrency_key).to eq "Alice" + + expect(GoodJob::DiscreteExecution.count).to be >= 1 + expect(GoodJob::DiscreteExecution.where("error LIKE '%GoodJob::ActiveJobExtensions::Concurrency::ConcurrencyExceededError%'")).to be_present end it 'is ignored with the job is executed via perform_now' do @@ -153,17 +155,37 @@ def perform end) end - it 'preserves the key value across retries' do - TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice") - begin - GoodJob.perform_inline - rescue StandardError - nil + describe 'retries' do + it 'preserves the value' do + TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice") + + begin + GoodJob.perform_inline + rescue StandardError + nil + end + + expect(GoodJob::Execution.count).to eq 1 + expect(GoodJob::Execution.first.concurrency_key).to be_present + expect(GoodJob::Job.first).not_to be_finished end - expect(GoodJob::Execution.count).to eq 2 - first_execution, retried_execution = GoodJob::Execution.order(created_at: :asc).to_a - expect(retried_execution.concurrency_key).to eq first_execution.concurrency_key + context 'when not discrete' do + it 'preserves the key value across retries' do + TestJob.set(wait_until: 5.minutes.ago).perform_later(name: "Alice") + GoodJob::Job.first.update!(is_discrete: false) + + begin + GoodJob.perform_inline + rescue StandardError + nil + end + + expect(GoodJob::Execution.count).to eq 2 + first_execution, retried_execution = GoodJob::Execution.order(created_at: :asc).to_a + expect(retried_execution.concurrency_key).to eq first_execution.concurrency_key + end + end end end diff --git a/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb index 35da102f3..9454e042d 100644 --- a/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb +++ b/spec/lib/good_job/active_job_extensions/interrupt_errors_spec.rb @@ -16,21 +16,63 @@ def perform context 'when a dequeued job has a performed_at but no finished_at' do before do active_job = TestJob.perform_later - GoodJob::Execution.find_by(active_job_id: active_job.job_id).update!(performed_at: Time.current) + good_job = GoodJob::Job.find_by(active_job_id: active_job.job_id) + good_job.update!(performed_at: Time.current, finished_at: nil) + good_job.discrete_executions.create!(performed_at: Time.current, finished_at: nil) end it 'raises a GoodJob::InterruptError' do expect { GoodJob.perform_inline }.to raise_error(GoodJob::InterruptError) end - it 'is rescuable' do - TestJob.retry_on GoodJob::InterruptError + context 'when discrete execution is NOT enabled' do + before do + allow(GoodJob::Execution).to receive(:discrete_support?).and_return(false) + end - expect { GoodJob.perform_inline }.not_to raise_error - expect(GoodJob::Execution.count).to eq(2) + it 'is rescuable' do + TestJob.retry_on GoodJob::InterruptError - job = GoodJob::Job.first - expect(job.executions.first.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at' + expect { GoodJob.perform_inline }.not_to raise_error + expect(GoodJob::Execution.count).to eq(2) + + job = GoodJob::Job.first + expect(job.executions.first.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at' + end + end + + context 'when discrete execution is enabled' do + before do + allow(GoodJob::Execution).to receive(:discrete_support?).and_return(true) + end + + it 'does not create a new execution' do + TestJob.retry_on GoodJob::InterruptError + + expect { GoodJob.perform_inline }.not_to raise_error + expect(GoodJob::Job.count).to eq(1) + expect(GoodJob::Execution.count).to eq(1) + expect(GoodJob::DiscreteExecution.count).to eq(2) + + job = GoodJob::Job.first + expect(job.executions.count).to eq(1) + expect(job.finished?).to be false + expect(job.error).to start_with 'GoodJob::InterruptError: Interrupted after starting perform at' + + initial_discrete_execution = job.discrete_executions.first + expect(initial_discrete_execution).to have_attributes( + performed_at: be_present, + finished_at: be_present, + error: start_with('GoodJob::InterruptError: Interrupted after starting perform at') + ) + + retried_discrete_execution = job.discrete_executions.last + expect(retried_discrete_execution).to have_attributes( + performed_at: be_present, + finished_at: be_present, + error: start_with('GoodJob::InterruptError: Interrupted after starting perform at') + ) + end end end diff --git a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb index 4394e0b39..bd8f4a032 100644 --- a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb +++ b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb @@ -85,7 +85,7 @@ def perform scheduler = GoodJob::Scheduler.new(performer, max_threads: 5) scheduler.create_thread - sleep_until(max: 5, increments_of: 0.5) { GoodJob::Execution.count >= 2 } + sleep_until(max: 5, increments_of: 0.5) { GoodJob::DiscreteExecution.count >= 2 } scheduler.shutdown expect(GoodJob::Notifier).not_to have_received(:notify) diff --git a/spec/lib/good_job_spec.rb b/spec/lib/good_job_spec.rb index 65bc2f191..0b38a3cbe 100644 --- a/spec/lib/good_job_spec.rb +++ b/spec/lib/good_job_spec.rb @@ -42,17 +42,20 @@ let!(:old_unfinished_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, scheduled_at: 15.days.ago, finished_at: nil) } let!(:old_finished_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago) } let!(:old_finished_job_execution) { GoodJob::Execution.create!(active_job_id: old_finished_job.active_job_id, retried_good_job_id: old_finished_job.id, finished_at: 16.days.ago) } + let!(:old_finished_job_discrete_execution) { GoodJob::DiscreteExecution.create!(active_job_id: old_finished_job.active_job_id, finished_at: 16.days.ago) } let!(:old_discarded_job) { GoodJob::Execution.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago, error: "Error") } let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 15.days.ago) } it 'deletes finished jobs' do destroyed_records_count = described_class.cleanup_preserved_jobs - expect(destroyed_records_count).to eq 4 + expect(destroyed_records_count).to eq 5 expect { recent_job.reload }.not_to raise_error expect { old_unfinished_job.reload }.not_to raise_error expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_discarded_job.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end @@ -60,12 +63,15 @@ it 'takes arguments' do destroyed_records_count = described_class.cleanup_preserved_jobs(older_than: 10.seconds) - expect(destroyed_records_count).to eq 5 + expect(destroyed_records_count).to eq 6 expect { recent_job.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_unfinished_job.reload }.not_to raise_error expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_discarded_job.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end it 'is instrumented' do @@ -83,12 +89,15 @@ allow(described_class.configuration).to receive(:env).and_return ENV.to_hash.merge({ 'GOOD_JOB_CLEANUP_DISCARDED_JOBS' => 'false' }) destroyed_records_count = described_class.cleanup_preserved_jobs - expect(destroyed_records_count).to eq 3 + expect(destroyed_records_count).to eq 4 expect { recent_job.reload }.not_to raise_error expect { old_unfinished_job.reload }.not_to raise_error expect { old_finished_job.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_execution.reload }.to raise_error ActiveRecord::RecordNotFound + expect { old_finished_job_discrete_execution.reload }.to raise_error ActiveRecord::RecordNotFound expect { old_discarded_job.reload }.not_to raise_error + expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end end diff --git a/spec/requests/good_job/jobs_controller_spec.rb b/spec/requests/good_job/jobs_controller_spec.rb index ac57b23ce..8e247fa71 100644 --- a/spec/requests/good_job/jobs_controller_spec.rb +++ b/spec/requests/good_job/jobs_controller_spec.rb @@ -92,19 +92,22 @@ describe 'mass_action=retry' do before do job.update(error: "Error message") + job.discrete_executions.first.update(error: "Error message") end it 'retries the job' do - put good_job.mass_update_jobs_path, params: { - mass_action: 'retry', - job_ids: [job.id], - } + expect do + put good_job.mass_update_jobs_path, params: { + mass_action: 'retry', + job_ids: [job.id], + } + end.to change { job.reload.status }.from(:discarded).to(:succeeded) expect(response).to have_http_status(:found) expect(flash[:notice]).to eq('Successfully retried 1 job') job.reload - expect(job.executions.count).to eq 2 + expect(job.discrete_executions.count).to eq 2 end end diff --git a/spec/support/example_app_helper.rb b/spec/support/example_app_helper.rb index 30b46317c..70ab9af75 100644 --- a/spec/support/example_app_helper.rb +++ b/spec/support/example_app_helper.rb @@ -48,6 +48,7 @@ def within_example_app tables = %i[ good_jobs good_job_batches + good_job_executions good_job_processes good_job_settings ] diff --git a/spec/support/rspec_not_change.rb b/spec/support/rspec_not_change.rb new file mode 100644 index 000000000..8f0ffae77 --- /dev/null +++ b/spec/support/rspec_not_change.rb @@ -0,0 +1,2 @@ +# frozen_string_literal: true +RSpec::Matchers.define_negated_matcher :not_change, :change diff --git a/spec/system/jobs_spec.rb b/spec/system/jobs_spec.rb index 4aa2f2177..d1f14370f 100644 --- a/spec/system/jobs_spec.rb +++ b/spec/system/jobs_spec.rb @@ -123,7 +123,7 @@ accept_confirm { click_on 'Retry job' } end expect(page).to have_content "Job has been retried" - end.to change { discarded_job.executions.reload.size }.by(1) + end.to change { discarded_job.reload.status }.from(:discarded).to(:queued) end it 'can discard jobs' do diff --git a/spec/test_app/app/jobs/example_job.rb b/spec/test_app/app/jobs/example_job.rb index 491eff8bf..27842b80f 100644 --- a/spec/test_app/app/jobs/example_job.rb +++ b/spec/test_app/app/jobs/example_job.rb @@ -4,7 +4,7 @@ class ExampleJob < ApplicationJob TYPES = [ SUCCESS_TYPE = 'success', - ERROR_ONCE_TYPE = 'error_once', + ERROR_ONCE_TYPE = 'error_once', ERROR_FIVE_TIMES_TYPE = 'error_five_times', DEAD_TYPE = 'dead', SLOW_TYPE = 'slow', diff --git a/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb new file mode 100644 index 000000000..a98c24915 --- /dev/null +++ b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true +class CreateGoodJobExecutions < ActiveRecord::Migration[7.0] + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.table_exists?(:good_job_executions) + end + end + + create_table :good_job_executions, id: :uuid do |t| + t.timestamps + + t.uuid :active_job_id + t.jsonb :serialized_params + t.datetime :perform_expected_at + t.datetime :finished_at + t.text :error + + t.index [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at + end + + change_table :good_jobs do |t| + t.boolean :is_discrete + t.integer :executions_count + end + end +end diff --git a/spec/test_app/db/schema.rb b/spec/test_app/db/schema.rb index 9b68e84ab..f5545d911 100644 --- a/spec/test_app/db/schema.rb +++ b/spec/test_app/db/schema.rb @@ -10,15 +10,14 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2023_01_31_214927) do - +ActiveRecord::Schema.define(version: 2023_04_12_144442) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" create_table "good_job_batches", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| - t.datetime "created_at", precision: 6, null: false - t.datetime "updated_at", precision: 6, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false t.text "description" t.jsonb "serialized_properties" t.text "on_finish" @@ -31,15 +30,26 @@ t.datetime "finished_at" end + create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.uuid "active_job_id" + t.jsonb "serialized_params" + t.datetime "perform_expected_at" + t.datetime "finished_at" + t.text "error" + t.index ["active_job_id", "created_at"], name: "index_good_job_executions_on_active_job_id_and_created_at" + end + create_table "good_job_processes", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| - t.datetime "created_at", precision: 6, null: false - t.datetime "updated_at", precision: 6, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false t.jsonb "state" end create_table "good_job_settings", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| - t.datetime "created_at", precision: 6, null: false - t.datetime "updated_at", precision: 6, null: false + t.datetime "created_at", null: false + t.datetime "updated_at", null: false t.text "key" t.jsonb "value" t.index ["key"], name: "index_good_job_settings_on_key", unique: true @@ -62,6 +72,8 @@ t.datetime "cron_at" t.uuid "batch_id" t.uuid "batch_callback_id" + t.boolean "is_discrete" + t.integer "executions_count" t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at" t.index ["active_job_id"], name: "index_good_jobs_on_active_job_id" t.index ["batch_callback_id"], name: "index_good_jobs_on_batch_callback_id", where: "(batch_callback_id IS NOT NULL)" From f7b456a17d053fddfcaf1c9cadb0fb1a2aa06d24 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Fri, 14 Apr 2023 13:42:28 +0000 Subject: [PATCH 2/8] Change time precision in test --- spec/app/models/good_job/execution_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/app/models/good_job/execution_spec.rb b/spec/app/models/good_job/execution_spec.rb index dd2b10f3f..d65e52165 100644 --- a/spec/app/models/good_job/execution_spec.rb +++ b/spec/app/models/good_job/execution_spec.rb @@ -638,8 +638,8 @@ def job_params expect(dexecution).to be_present expect(dexecution).to have_attributes( active_job_id: good_job.active_job_id, - created_at: good_job.performed_at, - perform_expected_at: good_job.scheduled_at || good_job.created_at, + created_at: within(0.001).of(good_job.performed_at), + perform_expected_at: within(0.001).of(good_job.created_at), finished_at: within(1.second).of(Time.current), error: nil, serialized_params: good_job.serialized_params From a09957f345dfdfdceade2e75ee63f5513e8a1c90 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Mon, 17 Apr 2023 15:06:14 -0700 Subject: [PATCH 3/8] Add benchmark script --- scripts/benchmark_discrete_jobs.rb | 38 ++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 scripts/benchmark_discrete_jobs.rb diff --git a/scripts/benchmark_discrete_jobs.rb b/scripts/benchmark_discrete_jobs.rb new file mode 100644 index 000000000..1e9b47a76 --- /dev/null +++ b/scripts/benchmark_discrete_jobs.rb @@ -0,0 +1,38 @@ +ENV['GOOD_JOB_EXECUTION_MODE'] = 'external' + +require_relative '../spec/test_app/config/environment' +require_relative '../lib/good_job' +require 'benchmark/ips' + +performer = GoodJob::JobPerformer.new("*") +Benchmark.ips do |x| + GoodJob::Execution.delete_all + ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new } + GoodJob::Execution.update_all(is_discrete: true) + x.report("discrete jobs and no errors") do + performer.next + end + + GoodJob::Execution.delete_all + ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new } + GoodJob::Execution.update_all(is_discrete: false) + x.report("undiscrete jobs and no errors") do + performer.next + end + + GoodJob::Execution.delete_all + ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new(ExampleJob::ERROR_FIVE_TIMES_TYPE) } + GoodJob::Execution.update_all(is_discrete: true) + x.report("discrete jobs and 5 errors") do + performer.next + end + + GoodJob::Execution.delete_all + ActiveJob::Base.queue_adapter.enqueue_all 10_000.times.map { ExampleJob.new(ExampleJob::ERROR_FIVE_TIMES_TYPE) } + GoodJob::Execution.update_all(is_discrete: false) + x.report("undiscrete jobs and 5 errors") do + performer.next + end + + x.compare! +end From fb7d9e2f7c4069a76b74a7b75ae3d3f5ee9ff9a1 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Wed, 19 Apr 2023 19:34:18 -0700 Subject: [PATCH 4/8] For discrete executions, id = active_job_id, scheduled_at = created_at if unscheduled --- app/models/good_job/execution.rb | 48 ++++++++++++------- app/models/good_job/job.rb | 6 ++- app/views/good_job/jobs/show.html.erb | 21 +++++++- lib/good_job/adapter.rb | 14 ++++-- spec/app/filters/good_job/jobs_filter_spec.rb | 4 +- spec/app/models/good_job/execution_spec.rb | 37 +++++++++++++- spec/integration/adapter_spec.rb | 4 +- .../notify_options_spec.rb | 4 +- spec/lib/good_job/adapter_spec.rb | 2 +- spec/support/postgres_notices.rb | 4 +- 10 files changed, 111 insertions(+), 33 deletions(-) diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index 8726aa2dd..417be76b6 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -74,7 +74,7 @@ def self.queue_parser(string) has_many :discrete_executions, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: 'active_job_id', inverse_of: :execution # rubocop:disable Rails/HasManyOrHasOneDependent after_destroy lambda { - discrete_executions.delete_all if discrete? + GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4 self.class.active_job_id(active_job_id).delete_all }, if: -> { @_destroy_job } @@ -226,10 +226,7 @@ def self.enqueue_args(active_job, overrides = {}) serialized_params: active_job.serialize, scheduled_at: active_job.scheduled_at, } - if discrete_support? - execution_args[:is_discrete] = true - execution_args[:executions_count] = 0 - end + execution_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) reenqueued_current_execution = CurrentThread.active_job_id && CurrentThread.active_job_id == active_job.job_id @@ -313,22 +310,31 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false current_execution = CurrentThread.execution retried = current_execution && current_execution.active_job_id == active_job.job_id - if retried && current_execution.discrete? - current_execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at })) - current_execution.finished_at = nil - current_execution.advisory_lock if create_with_advisory_lock - execution = current_execution + if retried + if current_execution.discrete? + execution = current_execution + execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at })) + execution.finished_at = nil + else + execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) + end else execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) - execution.is_discrete = nil if retried && discrete_support? && !current_execution.is_discrete? - execution.create_with_advisory_lock = create_with_advisory_lock - - instrument_payload[:execution] = execution + execution.make_discrete if discrete_support? + end - execution.save! - CurrentThread.execution.retried_good_job_id = execution.id if retried + if create_with_advisory_lock + if execution.persisted? + execution.advisory_lock + else + execution.create_with_advisory_lock = true + end end + instrument_payload[:execution] = execution + execution.save! + + CurrentThread.execution.retried_good_job_id = execution.id if retried && !CurrentThread.execution.discrete? active_job.provider_job_id = execution.id execution end @@ -450,6 +456,16 @@ def executable? self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id) end + def make_discrete + self.is_discrete = true + self.id = active_job_id + self.executions_count ||= 0 + + current_time = Time.current + self.created_at ||= current_time + self.scheduled_at ||= current_time + end + # Build an ActiveJob instance and deserialize the arguments, using `#active_job_data`. # # @param ignore_deserialization_errors [Boolean] diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb index e490e13e7..8f1394571 100644 --- a/app/models/good_job/job.rb +++ b/app/models/good_job/job.rb @@ -30,7 +30,11 @@ def table_name=(_value) belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true has_many :executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent - has_many :discrete_executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', inverse_of: :job, dependent: :delete_all + has_many :discrete_executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: :active_job_id, inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent + + after_destroy lambda { + GoodJob::DiscreteExecution.where(active_job_id: active_job_id).delete_all if discrete? # TODO: move into association `dependent: :delete_all` after v4 + } # Only the most-recent unretried execution represents a "Job" default_scope { where(retried_good_job_id: nil) } diff --git a/app/views/good_job/jobs/show.html.erb b/app/views/good_job/jobs/show.html.erb index bd361b22a..2e59f45db 100644 --- a/app/views/good_job/jobs/show.html.erb +++ b/app/views/good_job/jobs/show.html.erb @@ -26,6 +26,10 @@
<%= tag.strong @job.priority %>
+
+ <%= tag.span relative_time(@job.last_status_at), class: "small" %> + <%= status_badge @job.status %> +
<% if @job.status.in? [:scheduled, :retried, :queued] %> <%= button_to reschedule_job_path(@job.id), method: :put, class: "btn btn-sm btn-outline-primary", @@ -64,9 +68,22 @@
-
<%= t "good_job.models.job.arguments" %>
- <%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %> +
+ <%= t "good_job.models.job.arguments" %> + <%= tag.button type: "button", class: "btn btn-sm text-muted", role: "button", + title: t("good_job.actions.inspect"), + data: { bs_toggle: "collapse", bs_target: "##{dom_id(@job, 'params')}" }, + aria: { expanded: false, controls: dom_id(@job, "params") } do %> + <%= render_icon "info" %> + <%= t "good_job.actions.inspect" %> + <% end %> +
+<%= tag.pre @job.serialized_params["arguments"].map(&:inspect).join(', '), class: 'text-wrap text-break' %> + +<%= tag.div id: dom_id(@job, "params"), class: "list-group-item collapse small bg-dark text-light" do %> + <%= tag.pre JSON.pretty_generate(@job.display_serialized_params) %> +<% end %> <% if @job.discrete? %> <%= render 'executions', executions: @job.discrete_executions.reverse %> diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 17712437d..82a26e054 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -50,11 +50,15 @@ def enqueue_all(active_jobs) current_time = Time.current executions = active_jobs.map do |active_job| - GoodJob::Execution.build_for_enqueue(active_job, { - id: SecureRandom.uuid, - created_at: current_time, - updated_at: current_time, - }) + GoodJob::Execution.build_for_enqueue(active_job).tap do |execution| + if GoodJob::Execution.discrete_support? + execution.make_discrete + execution.scheduled_at = current_time if execution.scheduled_at == execution.created_at + end + + execution.created_at = current_time + execution.updated_at = current_time + end end inline_executions = [] diff --git a/spec/app/filters/good_job/jobs_filter_spec.rb b/spec/app/filters/good_job/jobs_filter_spec.rb index c24a62915..1a961a6dd 100644 --- a/spec/app/filters/good_job/jobs_filter_spec.rb +++ b/spec/app/filters/good_job/jobs_filter_spec.rb @@ -62,9 +62,9 @@ expect(filter.states).to eq({ "scheduled" => 1, "retried" => 0, - "queued" => 0, + "queued" => 1, "running" => 1, - "succeeded" => 2, + "succeeded" => 1, "discarded" => 1, }) end diff --git a/spec/app/models/good_job/execution_spec.rb b/spec/app/models/good_job/execution_spec.rb index d65e52165..f54fd602e 100644 --- a/spec/app/models/good_job/execution_spec.rb +++ b/spec/app/models/good_job/execution_spec.rb @@ -23,6 +23,41 @@ def perform(result_value = nil, raise_error: false) describe '.enqueue' do let(:active_job) { TestJob.new } + context 'when discrete' do + before do + allow(described_class).to receive(:discrete_support?).and_return(true) + end + + it 'assigns is discrete, id, scheduled_at' do + expect { described_class.enqueue(active_job) }.to change(described_class, :count).by(1) + + execution = described_class.last + expect(execution).to have_attributes( + is_discrete: true, + id: active_job.job_id, + active_job_id: active_job.job_id, + created_at: execution.scheduled_at, + scheduled_at: execution.created_at + ) + end + end + + context 'when NOT discrete' do + before { allow(described_class).to receive(:discrete_support?).and_return(false) } + + it 'does not assign id, scheduled_at' do + expect { described_class.enqueue(active_job) }.to change(described_class, :count).by(1) + + execution = described_class.last + expect(execution.id).not_to eq(active_job.job_id) + expect(execution).to have_attributes( + is_discrete: nil, + active_job_id: active_job.job_id, + scheduled_at: nil + ) + end + end + it 'creates a new GoodJob record' do execution = nil @@ -661,7 +696,7 @@ def job_params expect { good_job.perform } .to not_change(described_class, :count) .and change { good_job.reload.serialized_params["executions"] }.by(1) - .and not_change { good_job.reload.id } + .and not_change { good_job.reload.id } .and not_change { described_class.count } expect(good_job).to have_attributes( diff --git a/spec/integration/adapter_spec.rb b/spec/integration/adapter_spec.rb index 9445c3e39..a2a53c62f 100644 --- a/spec/integration/adapter_spec.rb +++ b/spec/integration/adapter_spec.rb @@ -37,7 +37,7 @@ def perform(*_args, **_kwargs) end it 'assigns successfully_enqueued' do - ok_job = TestJob.perform_later + ok_job = TestJob.new expect { ok_job.enqueue }.not_to raise_error expect(ok_job.successfully_enqueued?).to be true if ok_job.respond_to?(:successfully_enqueued?) @@ -58,7 +58,7 @@ def perform(*_args, **_kwargs) expect(execution).to have_attributes( queue_name: 'test', priority: 50, - scheduled_at: nil + scheduled_at: within(0.1).of(Time.current) ) end diff --git a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb index bd8f4a032..39e4094b0 100644 --- a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb +++ b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb @@ -30,10 +30,10 @@ def perform it 'can be overridden by set(good_job_notify: true)' do TestJob.good_job_notify = false TestJob.set(good_job_notify: true).perform_later - expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default' }) + expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', scheduled_at: within(0.1).of(Time.current) }) GoodJob::Bulk.enqueue { TestJob.set(good_job_notify: true).perform_later } - expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1 }) + expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1, scheduled_at: within(0.1).of(Time.current) }) end it 'works for bulk enqueuing' do diff --git a/spec/lib/good_job/adapter_spec.rb b/spec/lib/good_job/adapter_spec.rb index ffc60bf76..20bc8a604 100644 --- a/spec/lib/good_job/adapter_spec.rb +++ b/spec/lib/good_job/adapter_spec.rb @@ -129,7 +129,7 @@ def perform(succeed: true) expect(result).to eq 1 expect(active_jobs.map(&:provider_job_id)).to eq [active_jobs.first.provider_job_id, nil] - expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1 }) + expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1, scheduled_at: within(0.1).of(Time.current) }) end it 'sets successfully_enqueued, if Rails supports it' do diff --git a/spec/support/postgres_notices.rb b/spec/support/postgres_notices.rb index 8b0dd4be4..9257f51fa 100644 --- a/spec/support/postgres_notices.rb +++ b/spec/support/postgres_notices.rb @@ -16,7 +16,9 @@ end RSpec.configure do |config| - config.after do + config.around do |example| + POSTGRES_NOTICES.clear + example.run expect(POSTGRES_NOTICES).to be_empty end end From 61b6ffa3956428269c221d0ef5eccc7084dd581d Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Wed, 19 Apr 2023 21:46:51 -0700 Subject: [PATCH 5/8] Allow wider time assertions in specs --- spec/integration/adapter_spec.rb | 2 +- .../lib/good_job/active_job_extensions/notify_options_spec.rb | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spec/integration/adapter_spec.rb b/spec/integration/adapter_spec.rb index a2a53c62f..978ff4d75 100644 --- a/spec/integration/adapter_spec.rb +++ b/spec/integration/adapter_spec.rb @@ -58,7 +58,7 @@ def perform(*_args, **_kwargs) expect(execution).to have_attributes( queue_name: 'test', priority: 50, - scheduled_at: within(0.1).of(Time.current) + scheduled_at: within(1).of(Time.current) ) end diff --git a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb index 39e4094b0..8847c3540 100644 --- a/spec/lib/good_job/active_job_extensions/notify_options_spec.rb +++ b/spec/lib/good_job/active_job_extensions/notify_options_spec.rb @@ -30,10 +30,10 @@ def perform it 'can be overridden by set(good_job_notify: true)' do TestJob.good_job_notify = false TestJob.set(good_job_notify: true).perform_later - expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', scheduled_at: within(0.1).of(Time.current) }) + expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', scheduled_at: within(1).of(Time.current) }) GoodJob::Bulk.enqueue { TestJob.set(good_job_notify: true).perform_later } - expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1, scheduled_at: within(0.1).of(Time.current) }) + expect(GoodJob::Notifier).to have_received(:notify).with({ queue_name: 'default', count: 1, scheduled_at: within(1).of(Time.current) }) end it 'works for bulk enqueuing' do From c35586b61db05b906aae205e8bdfc6084b51d4e4 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Wed, 19 Apr 2023 21:51:29 -0700 Subject: [PATCH 6/8] Delete discrete executions before executions for better resiliency --- lib/good_job.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/good_job.rb b/lib/good_job.rb index b87a77c74..85f034e91 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -178,11 +178,11 @@ def self.cleanup_preserved_jobs(older_than: nil, in_batches_of: 1_000) active_job_ids = jobs_query.pluck(:active_job_id) break if active_job_ids.empty? - deleted_executions = GoodJob::Execution.where(active_job_id: active_job_ids).delete_all - deleted_executions_count += deleted_executions - deleted_discrete_executions = GoodJob::DiscreteExecution.where(active_job_id: active_job_ids).delete_all deleted_discrete_executions_count += deleted_discrete_executions + + deleted_executions = GoodJob::Execution.where(active_job_id: active_job_ids).delete_all + deleted_executions_count += deleted_executions end if GoodJob::BatchRecord.migrated? From 526f9b57c6216ff13d2e4f02ac4b72567a9b78f2 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Thu, 20 Apr 2023 10:43:45 -0700 Subject: [PATCH 7/8] Rename `perform_expected_at` to simpler `scheduled_at` --- app/models/good_job/discrete_execution.rb | 19 ++++--------------- app/models/good_job/execution.rb | 9 +++++---- .../migrations/create_good_jobs.rb.erb | 4 ++-- .../05_create_good_job_executions.rb.erb | 4 ++-- spec/app/models/good_job/execution_spec.rb | 15 ++++++++++----- spec/app/models/good_job/job_spec.rb | 2 +- ...230412144442_create_good_job_executions.rb | 4 ++-- spec/test_app/db/schema.rb | 4 ++-- 8 files changed, 28 insertions(+), 33 deletions(-) diff --git a/app/models/good_job/discrete_execution.rb b/app/models/good_job/discrete_execution.rb index 5264d040c..08d0e7e7a 100644 --- a/app/models/good_job/discrete_execution.rb +++ b/app/models/good_job/discrete_execution.rb @@ -17,19 +17,16 @@ def number # Time between when this job was expected to run and when it started running def queue_latency - now = Time.zone.now - actual_start = performed_at || finished_at || now - - actual_start - perform_expected_at unless perform_expected_at >= now + created_at - scheduled_at end # Time between when this job started and finished def runtime_latency - (finished_at || Time.zone.now) - performed_at if performed_at + (finished_at || Time.current) - performed_at if performed_at end def last_status_at - finished_at || performed_at || perform_expected_at || created_at + finished_at || created_at end def status @@ -41,16 +38,8 @@ def status else :succeeded end - elsif (perform_expected_at || created_at) > DateTime.current - if serialized_params.fetch('executions', 0) > 1 - :retried - else - :scheduled - end - elsif performed_at.present? - :running else - :queued + :running end end diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index 417be76b6..d8057a2f0 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -314,6 +314,8 @@ def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false if current_execution.discrete? execution = current_execution execution.assign_attributes(enqueue_args(active_job, { scheduled_at: scheduled_at })) + execution.scheduled_at ||= Time.current + execution.performed_at = nil execution.finished_at = nil else execution = build_for_enqueue(active_job, { scheduled_at: scheduled_at }) @@ -376,8 +378,8 @@ def perform if discrete? transaction do now = Time.current - discrete_execution = discrete_executions.create!(serialized_params: serialized_params, perform_expected_at: scheduled_at || created_at, created_at: now) - update!(performed_at: now, executions_count: (executions_count || 0) + 1) + discrete_execution = discrete_executions.create!(serialized_params: serialized_params, scheduled_at: (scheduled_at || created_at), created_at: now) + update!(performed_at: now, executions_count: ((executions_count || 0) + 1)) end else update!(performed_at: Time.current) @@ -419,8 +421,7 @@ def perform if discrete_execution transaction do discrete_execution.update!(finished_at: Time.current) - self.performed_at = nil # TODO: will this break something to unset this? - save! + update!(performed_at: nil, finished_at: nil, retried_good_job_id: nil) end else save! diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index ad0004063..dd1d4ed2f 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -44,9 +44,9 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> create_table :good_job_executions, id: :uuid do |t| t.timestamps - t.uuid :active_job_id + t.uuid :active_job_id, null: false t.jsonb :serialized_params - t.datetime :perform_expected_at + t.datetime :scheduled_at t.datetime :finished_at t.text :error end diff --git a/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb index e4f3a48d8..f21829dfc 100644 --- a/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb +++ b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb @@ -12,9 +12,9 @@ class CreateGoodJobExecutions < ActiveRecord::Migration<%= migration_version %> create_table :good_job_executions, id: :uuid do |t| t.timestamps - t.uuid :active_job_id + t.uuid :active_job_id, null: false t.jsonb :serialized_params - t.datetime :perform_expected_at + t.datetime :scheduled_at t.datetime :finished_at t.text :error diff --git a/spec/app/models/good_job/execution_spec.rb b/spec/app/models/good_job/execution_spec.rb index f54fd602e..4ed4b2069 100644 --- a/spec/app/models/good_job/execution_spec.rb +++ b/spec/app/models/good_job/execution_spec.rb @@ -666,15 +666,20 @@ def job_params good_job.update!(is_discrete: true) end - it 'creates a DiscreteExecution record' do + it 'updates the Execution record and creates a DiscreteExecution record' do good_job.perform + expect(good_job.reload).to have_attributes( + executions_count: 1, + finished_at: within(1.second).of(Time.current) + ) + dexecution = good_job.discrete_executions.first expect(dexecution).to be_present expect(dexecution).to have_attributes( active_job_id: good_job.active_job_id, created_at: within(0.001).of(good_job.performed_at), - perform_expected_at: within(0.001).of(good_job.created_at), + scheduled_at: within(0.001).of(good_job.created_at), finished_at: within(1.second).of(Time.current), error: nil, serialized_params: good_job.serialized_params @@ -696,10 +701,10 @@ def job_params expect { good_job.perform } .to not_change(described_class, :count) .and change { good_job.reload.serialized_params["executions"] }.by(1) - .and not_change { good_job.reload.id } + .and not_change { good_job.reload.id } .and not_change { described_class.count } - expect(good_job).to have_attributes( + expect(good_job.reload).to have_attributes( error: "TestJob::ExpectedError: Raised expected error", created_at: within(1.second).of(Time.current), performed_at: nil, @@ -712,7 +717,7 @@ def job_params active_job_id: good_job.active_job_id, error: "TestJob::ExpectedError: Raised expected error", created_at: within(1.second).of(Time.current), - perform_expected_at: within(1.second).of(Time.current), + scheduled_at: within(1.second).of(Time.current), finished_at: within(1.second).of(Time.current) ) end diff --git a/spec/app/models/good_job/job_spec.rb b/spec/app/models/good_job/job_spec.rb index ffc9e92a2..01bcd1f99 100644 --- a/spec/app/models/good_job/job_spec.rb +++ b/spec/app/models/good_job/job_spec.rb @@ -22,7 +22,7 @@ } ).tap do |job| job.discrete_executions.create!( - perform_expected_at: 1.minute.ago, + scheduled_at: 1.minute.ago, created_at: 1.minute.ago, finished_at: 1.minute.ago, error: "TestJob::Error: TestJob::Error" diff --git a/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb index a98c24915..004d848ae 100644 --- a/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb +++ b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb @@ -12,9 +12,9 @@ def change create_table :good_job_executions, id: :uuid do |t| t.timestamps - t.uuid :active_job_id + t.uuid :active_job_id, null: false t.jsonb :serialized_params - t.datetime :perform_expected_at + t.datetime :scheduled_at t.datetime :finished_at t.text :error diff --git a/spec/test_app/db/schema.rb b/spec/test_app/db/schema.rb index f5545d911..191ec734a 100644 --- a/spec/test_app/db/schema.rb +++ b/spec/test_app/db/schema.rb @@ -33,9 +33,9 @@ create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| t.datetime "created_at", null: false t.datetime "updated_at", null: false - t.uuid "active_job_id" + t.uuid "active_job_id", null: false t.jsonb "serialized_params" - t.datetime "perform_expected_at" + t.datetime "scheduled_at" t.datetime "finished_at" t.text "error" t.index ["active_job_id", "created_at"], name: "index_good_job_executions_on_active_job_id_and_created_at" From ece73e0eb490eb752999836b0895be332c8f77b0 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Fri, 21 Apr 2023 08:01:51 -0700 Subject: [PATCH 8/8] Add job_class and queue_name as columns because they will be queried in dashboard --- app/models/good_job/base_execution.rb | 2 +- app/models/good_job/execution.rb | 9 ++++++++- app/models/good_job/job.rb | 2 ++ .../templates/install/migrations/create_good_jobs.rb.erb | 3 +++ .../migrations/05_create_good_job_executions.rb.erb | 3 +++ spec/app/models/good_job/execution_spec.rb | 2 ++ .../migrate/20230412144442_create_good_job_executions.rb | 3 +++ spec/test_app/db/schema.rb | 3 +++ 8 files changed, 25 insertions(+), 2 deletions(-) diff --git a/app/models/good_job/base_execution.rb b/app/models/good_job/base_execution.rb index 03dcfa856..15f56221f 100644 --- a/app/models/good_job/base_execution.rb +++ b/app/models/good_job/base_execution.rb @@ -47,7 +47,7 @@ def discrete_support? # The ActiveJob job class, as a string # @return [String] def job_class - serialized_params['job_class'] + discrete? ? attributes['job_class'] : serialized_params['job_class'] end def discrete? diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index d8057a2f0..46e6caf27 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -378,7 +378,13 @@ def perform if discrete? transaction do now = Time.current - discrete_execution = discrete_executions.create!(serialized_params: serialized_params, scheduled_at: (scheduled_at || created_at), created_at: now) + discrete_execution = discrete_executions.create!( + job_class: job_class, + queue_name: queue_name, + serialized_params: serialized_params, + scheduled_at: (scheduled_at || created_at), + created_at: now + ) update!(performed_at: now, executions_count: ((executions_count || 0) + 1)) end else @@ -460,6 +466,7 @@ def executable? def make_discrete self.is_discrete = true self.id = active_job_id + self.job_class = serialized_params['job_class'] self.executions_count ||= 0 current_time = Time.current diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb index 8f1394571..3fa26c19b 100644 --- a/app/models/good_job/job.rb +++ b/app/models/good_job/job.rb @@ -61,6 +61,8 @@ def table_name=(_value) # Errored but will not be retried scope :discarded, -> { finished.where.not(error: nil) } + scope :unfinished_undiscrete, -> { where(finished_at: nil, retried_good_job_id: nil, is_discrete: [nil, false]) } + # The job's ActiveJob UUID # @return [String] def id diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index dd1d4ed2f..27d23ab85 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -25,6 +25,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.boolean :is_discrete t.integer :executions_count + t.text :job_class end create_table :good_job_batches, id: :uuid do |t| @@ -45,6 +46,8 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.timestamps t.uuid :active_job_id, null: false + t.text :job_class + t.text :queue_name t.jsonb :serialized_params t.datetime :scheduled_at t.datetime :finished_at diff --git a/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb index f21829dfc..eb2fd1392 100644 --- a/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb +++ b/lib/generators/good_job/templates/update/migrations/05_create_good_job_executions.rb.erb @@ -13,6 +13,8 @@ class CreateGoodJobExecutions < ActiveRecord::Migration<%= migration_version %> t.timestamps t.uuid :active_job_id, null: false + t.text :job_class + t.text :queue_name t.jsonb :serialized_params t.datetime :scheduled_at t.datetime :finished_at @@ -24,6 +26,7 @@ class CreateGoodJobExecutions < ActiveRecord::Migration<%= migration_version %> change_table :good_jobs do |t| t.boolean :is_discrete t.integer :executions_count + t.text :job_class end end end diff --git a/spec/app/models/good_job/execution_spec.rb b/spec/app/models/good_job/execution_spec.rb index 4ed4b2069..0282153ad 100644 --- a/spec/app/models/good_job/execution_spec.rb +++ b/spec/app/models/good_job/execution_spec.rb @@ -678,6 +678,8 @@ def job_params expect(dexecution).to be_present expect(dexecution).to have_attributes( active_job_id: good_job.active_job_id, + job_class: good_job.job_class, + queue_name: good_job.queue_name, created_at: within(0.001).of(good_job.performed_at), scheduled_at: within(0.001).of(good_job.created_at), finished_at: within(1.second).of(Time.current), diff --git a/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb index 004d848ae..a3eaf68b9 100644 --- a/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb +++ b/spec/test_app/db/migrate/20230412144442_create_good_job_executions.rb @@ -13,6 +13,8 @@ def change t.timestamps t.uuid :active_job_id, null: false + t.text :job_class + t.text :queue_name t.jsonb :serialized_params t.datetime :scheduled_at t.datetime :finished_at @@ -24,6 +26,7 @@ def change change_table :good_jobs do |t| t.boolean :is_discrete t.integer :executions_count + t.text :job_class end end end diff --git a/spec/test_app/db/schema.rb b/spec/test_app/db/schema.rb index 191ec734a..dc2a1cb37 100644 --- a/spec/test_app/db/schema.rb +++ b/spec/test_app/db/schema.rb @@ -34,6 +34,8 @@ t.datetime "created_at", null: false t.datetime "updated_at", null: false t.uuid "active_job_id", null: false + t.text "job_class" + t.text "queue_name" t.jsonb "serialized_params" t.datetime "scheduled_at" t.datetime "finished_at" @@ -74,6 +76,7 @@ t.uuid "batch_callback_id" t.boolean "is_discrete" t.integer "executions_count" + t.text "job_class" t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at" t.index ["active_job_id"], name: "index_good_jobs_on_active_job_id" t.index ["batch_callback_id"], name: "index_good_jobs_on_batch_callback_id", where: "(batch_callback_id IS NOT NULL)"