From 4d443b3a26821fd30017b5a63e8dbf46597f33b2 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Sat, 14 Sep 2024 10:42:49 -0700 Subject: [PATCH] Don't delete batches until all their callback jobs complete (#1454) Connects to #1387 Add logic to delete batches only after their callback jobs have completed. --- .rspec | 2 ++ app/models/good_job/batch.rb | 12 +++++-- app/models/good_job/batch_record.rb | 26 +++++++++++++--- app/models/good_job/job.rb | 2 ++ ...dd_jobs_finished_at_to_good_job_batches.rb | 17 ++++++++++ demo/db/schema.rb | 3 +- .../migrations/create_good_jobs.rb.erb | 1 + ...obs_finished_at_to_good_job_batches.rb.erb | 17 ++++++++++ lib/good_job.rb | 8 ++--- spec/app/models/good_job/batch_record_spec.rb | 31 +++++++++++++++++++ spec/app/models/good_job/batch_spec.rb | 8 +++-- spec/integration/batch_spec.rb | 18 +++++++++++ spec/lib/good_job_spec.rb | 12 ++++++- 13 files changed, 142 insertions(+), 15 deletions(-) create mode 100644 demo/db/migrate/20240801143343_add_jobs_finished_at_to_good_job_batches.rb create mode 100644 lib/generators/good_job/templates/update/migrations/02_add_jobs_finished_at_to_good_job_batches.rb.erb diff --git a/.rspec b/.rspec index b18b4ccaf..31b1534c5 100644 --- a/.rspec +++ b/.rspec @@ -1,3 +1,5 @@ --color --order random --require spec_helper +--require ./spec/support/pre_documentation_formatter.rb \ +--format PreDocumentationFormatter diff --git a/app/models/good_job/batch.rb b/app/models/good_job/batch.rb index ba2dcc2dc..7f530aa88 100644 --- a/app/models/good_job/batch.rb +++ b/app/models/good_job/batch.rb @@ -26,10 +26,12 @@ class Batch :enqueued_at, :finished_at, :discarded_at, + :jobs_finished_at, :enqueued?, :finished?, :succeeded?, :discarded?, + :jobs_finished?, :description, :description=, :on_finish, @@ -95,8 +97,12 @@ def enqueue(active_jobs = [], **properties, &block) record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do record.enqueued_at_will_change! + record.jobs_finished_at_will_change! if GoodJob::BatchRecord.jobs_finished_at_migrated? record.finished_at_will_change! - record.update!(enqueued_at: nil, finished_at: nil) + + update_attributes = { discarded_at: nil, finished_at: nil } + update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated? + record.update!(**update_attributes) end end end @@ -142,7 +148,9 @@ def retry buffer = GoodJob::Adapter::InlineBuffer.capture do record.transaction do record.with_advisory_lock(function: "pg_advisory_xact_lock") do - record.update!(discarded_at: nil, finished_at: nil) + update_attributes = { discarded_at: nil, finished_at: nil } + update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated? + record.update!(update_attributes) record.jobs.discarded.each(&:retry_job) record._continue_discard_or_finish(lock: false) end diff --git a/app/models/good_job/batch_record.rb b/app/models/good_job/batch_record.rb index 60081ec23..06ea11f77 100644 --- a/app/models/good_job/batch_record.rb +++ b/app/models/good_job/batch_record.rb @@ -38,6 +38,10 @@ class BatchRecord < BaseRecord query end) + def self.jobs_finished_at_migrated? + column_names.include?('jobs_finished_at') + end + # Whether the batch has finished and no jobs were discarded # @return [Boolean] def succeeded? @@ -52,22 +56,26 @@ def display_attributes attributes.except('serialized_properties').merge(properties: properties) end - def _continue_discard_or_finish(execution = nil, lock: true) - execution_discarded = execution && execution.finished_at.present? && execution.error.present? + def _continue_discard_or_finish(job = nil, lock: true) + job_discarded = job && job.finished_at.present? && job.error.present? buffer = GoodJob::Adapter::InlineBuffer.capture do advisory_lock_maybe(lock) do Batch.within_thread(batch_id: nil, batch_callback_id: id) do reload - if execution_discarded && !discarded_at + + if job_discarded && !discarded_at update(discarded_at: Time.current) on_discard.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :discard }) if on_discard.present? end - if enqueued_at && !finished_at && jobs.where(finished_at: nil).count.zero? - update(finished_at: Time.current) + if enqueued_at && !(self.class.jobs_finished_at_migrated? ? jobs_finished_at : finished_at) && jobs.where(finished_at: nil).count.zero? + self.class.jobs_finished_at_migrated? ? update(jobs_finished_at: Time.current) : update(finished_at: Time.current) + on_success.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :success }) if !discarded_at && on_success.present? on_finish.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :finish }) if on_finish.present? end + + update(finished_at: Time.current) if !finished_at && self.class.jobs_finished_at_migrated? && jobs_finished? && callback_jobs.where(finished_at: nil).count.zero? end end end @@ -97,6 +105,14 @@ def properties=(value) self.serialized_properties = value end + def jobs_finished? + self.class.jobs_finished_at_migrated? ? jobs_finished_at : finished_at + end + + def jobs_finished_at + self.class.jobs_finished_at_migrated? ? self[:jobs_finished_at] : self[:finished_at] + end + private def advisory_lock_maybe(value, &block) diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb index 0aa4f9d99..8bbfebb81 100644 --- a/app/models/good_job/job.rb +++ b/app/models/good_job/job.rb @@ -40,6 +40,7 @@ class Job < BaseRecord set_callback :perform_unlocked, :after, :continue_discard_or_finish_batch belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true + belongs_to :callback_batch, class_name: 'GoodJob::BatchRecord', foreign_key: :batch_callback_id, inverse_of: :callback_jobs, optional: true belongs_to :locked_by_process, class_name: "GoodJob::Process", foreign_key: :locked_by_id, inverse_of: :locked_jobs, optional: true has_many :executions, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: "id", inverse_of: :job, dependent: :delete_all @@ -743,6 +744,7 @@ def reset_batch_values(&block) def continue_discard_or_finish_batch batch._continue_discard_or_finish(self) if batch.present? + callback_batch._continue_discard_or_finish if callback_batch.present? end def active_job_data diff --git a/demo/db/migrate/20240801143343_add_jobs_finished_at_to_good_job_batches.rb b/demo/db/migrate/20240801143343_add_jobs_finished_at_to_good_job_batches.rb new file mode 100644 index 000000000..09c3e832e --- /dev/null +++ b/demo/db/migrate/20240801143343_add_jobs_finished_at_to_good_job_batches.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class AddJobsFinishedAtToGoodJobBatches < ActiveRecord::Migration[7.1] + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_job_batches, :jobs_finished_at) + end + end + + change_table :good_job_batches do |t| + t.datetime :jobs_finished_at + end + end +end diff --git a/demo/db/schema.rb b/demo/db/schema.rb index d8bc826f9..db7620fed 100644 --- a/demo/db/schema.rb +++ b/demo/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2024_06_13_151310) do +ActiveRecord::Schema.define(version: 2024_08_01_143343) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" @@ -28,6 +28,7 @@ t.datetime "enqueued_at" t.datetime "discarded_at" t.datetime "finished_at" + t.datetime "jobs_finished_at" end create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index ab5da0a01..7f0774099 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -46,6 +46,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.datetime :enqueued_at t.datetime :discarded_at t.datetime :finished_at + t.datetime :jobs_finished_at end create_table :good_job_executions, id: :uuid do |t| diff --git a/lib/generators/good_job/templates/update/migrations/02_add_jobs_finished_at_to_good_job_batches.rb.erb b/lib/generators/good_job/templates/update/migrations/02_add_jobs_finished_at_to_good_job_batches.rb.erb new file mode 100644 index 000000000..482818514 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/02_add_jobs_finished_at_to_good_job_batches.rb.erb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class AddJobsFinishedAtToGoodJobBatches < ActiveRecord::Migration<%= migration_version %> + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_job_batches, :jobs_finished_at) + end + end + + change_table :good_job_batches do |t| + t.datetime :jobs_finished_at + end + end +end diff --git a/lib/good_job.rb b/lib/good_job.rb index 72728d925..a13b324be 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -171,10 +171,10 @@ def self.restart(timeout: -1) _shutdown_all(Capsule.instances, :restart, timeout: timeout) end - # Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler}, {GoodJob::MultiScheduler}, {GoodJob::CronManager}) + # Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler}, {GoodJob::MultiScheduler}, {GoodJob::CronManager}, {GoodJob::SharedExecutor}) # @param executables [Array] Objects to shut down. - # @param method_name [:symbol] Method to call, e.g. +:shutdown+ or +:restart+. - # @param timeout [nil,Numeric] + # @param method_name [Symbol] Method to call, e.g. +:shutdown+ or +:restart+. + # @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish. # @param after [Array] Objects to shut down after initial executables shut down. # @return [void] def self._shutdown_all(executables, method_name = :shutdown, timeout: -1, after: []) @@ -290,7 +290,7 @@ def self.deprecator # For use in tests/CI to validate GoodJob is up-to-date. # @return [Boolean] def self.migrated? - true + GoodJob::BatchRecord.jobs_finished_at_migrated? end end diff --git a/spec/app/models/good_job/batch_record_spec.rb b/spec/app/models/good_job/batch_record_spec.rb index a2d002493..6969f6cde 100644 --- a/spec/app/models/good_job/batch_record_spec.rb +++ b/spec/app/models/good_job/batch_record_spec.rb @@ -42,4 +42,35 @@ expect(result.last).to eq last_job end end + + describe 'finished_at' do + it 'is now set when all jobs in the batch are finished' do + batch = described_class.create! + batch.update(enqueued_at: Time.current, jobs_finished_at: Time.current) + batch.callback_jobs.create!(finished_at: nil) + batch.callback_jobs.create!(finished_at: Time.current) + + batch._continue_discard_or_finish + + expect(batch.reload.finished_at).to be_nil + + batch.callback_jobs.update(finished_at: Time.current) + batch._continue_discard_or_finish + + expect(batch.reload.finished_at).to be_within(1.second).of(Time.current) + end + end + + describe 'deletion logic' do + it 'checks finished_at' do + batch = described_class.create! + batch.update(enqueued_at: Time.current, jobs_finished_at: Time.current, finished_at: nil) + + expect { described_class.finished_before(Time.current).delete_all }.not_to change(described_class, :count) + + batch.update(finished_at: Time.current) + + expect { described_class.finished_before(Time.current).delete_all }.to change(described_class, :count).by(-1) + end + end end diff --git a/spec/app/models/good_job/batch_spec.rb b/spec/app/models/good_job/batch_spec.rb index 3f76108fc..b8ec4aed6 100644 --- a/spec/app/models/good_job/batch_spec.rb +++ b/spec/app/models/good_job/batch_spec.rb @@ -117,11 +117,15 @@ def perform batch.retry - expect(batch.reload).to be_enqueued + batch.reload + expect(batch).to have_attributes(discarded_at: nil, jobs_finished_at: nil, finished_at: nil) + expect(batch).to be_enqueued GoodJob.perform_inline - expect(batch.reload).to be_succeeded + batch.reload + expect(batch).to have_attributes(discarded_at: nil, jobs_finished_at: be_present, finished_at: be_present) + expect(batch).to be_succeeded end end diff --git a/spec/integration/batch_spec.rb b/spec/integration/batch_spec.rb index 91f194562..cc2eb0138 100644 --- a/spec/integration/batch_spec.rb +++ b/spec/integration/batch_spec.rb @@ -307,8 +307,10 @@ def perform(_batch, _params) batch.reload expect(batch).to be_succeeded + expect(batch).to be_jobs_finished expect(batch.callback_active_jobs.count).to eq 1 expect(batch.callback_active_jobs.first).to be_a TestJob::SuccessCallbackJob + expect(batch.finished_at).to be_present job, callback_job = GoodJob::Job.order(:created_at).to_a expect(job.status).to eq :succeeded @@ -351,4 +353,20 @@ def perform(_batch, _params) expect { batch.retry }.to change { GoodJob::Job.discarded.count }.by(-1) end end + + describe 'batch deletion' do + it 'deletes batches only after their callback jobs have completed' do + batch = GoodJob::Batch.new + batch.on_finish = "BatchCallbackJob" + batch.enqueue do + TestJob.perform_later + end + + GoodJob.perform_inline + + batch.reload + expect(batch.jobs_finished_at).to be_present + expect(batch.finished_at).to be_within(1.second).of(Time.current) + end + end end diff --git a/spec/lib/good_job_spec.rb b/spec/lib/good_job_spec.rb index 3b1f83f8b..739399ea0 100644 --- a/spec/lib/good_job_spec.rb +++ b/spec/lib/good_job_spec.rb @@ -58,7 +58,7 @@ let!(:old_finished_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago) } let!(:old_finished_job_execution) { GoodJob::Execution.create!(active_job_id: old_finished_job.active_job_id, finished_at: 16.days.ago) } let!(:old_discarded_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago, error: "Error") } - let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 15.days.ago) } + let!(:old_batch) { GoodJob::BatchRecord.create!(jobs_finished_at: 14.days.ago, finished_at: 15.days.ago) } it 'deletes finished jobs' do destroyed_records_count = described_class.cleanup_preserved_jobs(in_batches_of: 1) @@ -110,6 +110,16 @@ expect { old_discarded_job.reload }.not_to raise_error expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end + + it "does not delete batches until their callbacks have finished" do + old_batch.update!(finished_at: nil) + described_class.cleanup_preserved_jobs + expect { old_batch.reload }.not_to raise_error + + old_batch.update!(finished_at: 15.days.ago) + described_class.cleanup_preserved_jobs + expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound + end end describe '.perform_inline' do