From a311e1e856e271b36b3cb1d067635e9605d4a028 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Wed, 31 Jul 2024 21:33:30 -0700 Subject: [PATCH] Don't delete batches until all their callback jobs complete Connects to #1387 Add logic to delete batches only after their callback jobs have completed. --- app/models/good_job/batch.rb | 2 ++ app/models/good_job/batch_record.rb | 26 +++++++++++++--- app/models/good_job/job.rb | 2 ++ ...llbacks_finished_at_to_good_job_batches.rb | 17 ++++++++++ ...1143344_add_indexes_to_good_job_batches.rb | 17 ++++++++++ demo/db/schema.rb | 5 ++- .../migrations/create_good_jobs.rb.erb | 4 +++ ...cks_finished_at_to_good_job_batches.rb.erb | 17 ++++++++++ .../03_add_indexes_to_good_job_batches.rb.erb | 17 ++++++++++ lib/good_job.rb | 2 +- spec/app/models/good_job/batch_record_spec.rb | 31 +++++++++++++++++++ spec/integration/batch_spec.rb | 17 ++++++++++ spec/lib/good_job_spec.rb | 12 ++++++- 13 files changed, 162 insertions(+), 7 deletions(-) create mode 100644 demo/db/migrate/20240801143343_add_callbacks_finished_at_to_good_job_batches.rb create mode 100644 demo/db/migrate/20240801143344_add_indexes_to_good_job_batches.rb create mode 100644 lib/generators/good_job/templates/update/migrations/02_add_callbacks_finished_at_to_good_job_batches.rb.erb create mode 100644 lib/generators/good_job/templates/update/migrations/03_add_indexes_to_good_job_batches.rb.erb diff --git a/app/models/good_job/batch.rb b/app/models/good_job/batch.rb index ba2dcc2dc..d3f135c66 100644 --- a/app/models/good_job/batch.rb +++ b/app/models/good_job/batch.rb @@ -26,10 +26,12 @@ class Batch :enqueued_at, :finished_at, :discarded_at, + :callbacks_finished_at, :enqueued?, :finished?, :succeeded?, :discarded?, + :callbacks_finished?, :description, :description=, :on_finish, diff --git a/app/models/good_job/batch_record.rb b/app/models/good_job/batch_record.rb index bf162d58c..251c929c7 100644 --- a/app/models/good_job/batch_record.rb +++ b/app/models/good_job/batch_record.rb @@ -18,11 +18,15 @@ class BatchRecord < BaseRecord scope :not_discarded, -> { where(discarded_at: nil) } scope :succeeded, -> { finished.not_discarded } - scope :finished_before, ->(timestamp) { where(arel_table['finished_at'].lteq(bind_value('finished_at', timestamp, ActiveRecord::Type::DateTime))) } + scope :finished_before, lambda { |timestamp| + finished_column_name = callbacks_finished_at_migrated? ? 'callbacks_finished_at' : 'finished_at' + where(arel_table[finished_column_name].lteq(bind_value(finished_column_name, timestamp, ActiveRecord::Type::DateTime))) + } alias_attribute :enqueued?, :enqueued_at alias_attribute :discarded?, :discarded_at alias_attribute :finished?, :finished_at + alias_attribute :callbacks_finished?, :callbacks_finished_at scope :display_all, (lambda do |after_created_at: nil, after_id: nil| query = order(created_at: :desc, id: :desc) @@ -38,6 +42,17 @@ class BatchRecord < BaseRecord query end) + def self.callbacks_finished_at_migrated? + column_names.include?('callbacks_finished_at') + end + + def self.indexes_migrated? + return true if connection.index_name_exists?(:good_job_batches, :index_good_job_batches_for_cleanup) + + migration_pending_warning! + false + end + # Whether the batch has finished and no jobs were discarded # @return [Boolean] def succeeded? @@ -52,13 +67,14 @@ def display_attributes attributes.except('serialized_properties').merge(properties: properties) end - def _continue_discard_or_finish(execution = nil, lock: true) - execution_discarded = execution && execution.finished_at.present? && execution.error.present? + def _continue_discard_or_finish(job = nil, lock: true) + job_discarded = job && job.finished_at.present? && job.error.present? buffer = GoodJob::Adapter::InlineBuffer.capture do advisory_lock_maybe(lock) do Batch.within_thread(batch_id: nil, batch_callback_id: id) do reload - if execution_discarded && !discarded_at + + if job_discarded && !discarded_at update(discarded_at: Time.current) on_discard.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :discard }) if on_discard.present? end @@ -68,6 +84,8 @@ def _continue_discard_or_finish(execution = nil, lock: true) on_success.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :success }) if !discarded_at && on_success.present? on_finish.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :finish }) if on_finish.present? end + + update(callbacks_finished_at: Time.current) if finished_at && callback_jobs.where(finished_at: nil).count.zero? end end end diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb index 7b89f1f06..d86af7637 100644 --- a/app/models/good_job/job.rb +++ b/app/models/good_job/job.rb @@ -40,6 +40,7 @@ class Job < BaseRecord set_callback :perform_unlocked, :after, :continue_discard_or_finish_batch belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true + belongs_to :callback_batch, class_name: 'GoodJob::BatchRecord', foreign_key: :batch_callback_id, inverse_of: :callback_jobs, optional: true belongs_to :locked_by_process, class_name: "GoodJob::Process", foreign_key: :locked_by_id, inverse_of: :locked_jobs, optional: true has_many :executions, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: "id", inverse_of: :job, dependent: :delete_all @@ -761,6 +762,7 @@ def reset_batch_values(&block) def continue_discard_or_finish_batch batch._continue_discard_or_finish(self) if batch.present? + callback_batch._continue_discard_or_finish(self) if callback_batch.present? end def active_job_data diff --git a/demo/db/migrate/20240801143343_add_callbacks_finished_at_to_good_job_batches.rb b/demo/db/migrate/20240801143343_add_callbacks_finished_at_to_good_job_batches.rb new file mode 100644 index 000000000..c630a97cc --- /dev/null +++ b/demo/db/migrate/20240801143343_add_callbacks_finished_at_to_good_job_batches.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class AddCallbacksFinishedAtToGoodJobBatches < ActiveRecord::Migration[7.1] + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_job_batches, :callbacks_finished_at) + end + end + + change_table :good_job_batches do |t| + t.datetime :callbacks_finished_at + end + end +end diff --git a/demo/db/migrate/20240801143344_add_indexes_to_good_job_batches.rb b/demo/db/migrate/20240801143344_add_indexes_to_good_job_batches.rb new file mode 100644 index 000000000..23f43b106 --- /dev/null +++ b/demo/db/migrate/20240801143344_add_indexes_to_good_job_batches.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class AddIndexesToGoodJobBatches < ActiveRecord::Migration[7.1] + disable_ddl_transaction! + + def change + reversible do |dir| + dir.up do + return if connection.index_name_exists?(:good_job_batches, :index_good_job_batches_for_cleanup) + end + end + + add_index :good_job_batches, [:created_at, :id], name: "index_good_job_batches_for_display", algorithm: :concurrently + add_index :good_job_batches, [:callbacks_finished_at, :discarded_at], order: { callbacks_finished_at: :asc, discarded_at: "ASC NULLS LAST" }, + where: "(callbacks_finished_at IS NOT NULL)", name: "index_good_job_batches_for_cleanup", algorithm: :concurrently + end +end diff --git a/demo/db/schema.rb b/demo/db/schema.rb index d8bc826f9..b65cf86ab 100644 --- a/demo/db/schema.rb +++ b/demo/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2024_06_13_151310) do +ActiveRecord::Schema.define(version: 2024_08_01_143344) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" @@ -28,6 +28,9 @@ t.datetime "enqueued_at" t.datetime "discarded_at" t.datetime "finished_at" + t.datetime "callbacks_finished_at" + t.index ["callbacks_finished_at", "discarded_at"], name: "index_good_job_batches_for_cleanup", where: "(callbacks_finished_at IS NOT NULL)" + t.index ["created_at", "id"], name: "index_good_job_batches_for_display" end create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index ab5da0a01..22f3fc2da 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -46,6 +46,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.datetime :enqueued_at t.datetime :discarded_at t.datetime :finished_at + t.datetime :callbacks_finished_at end create_table :good_job_executions, id: :uuid do |t| @@ -98,5 +99,8 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> add_index :good_jobs, :locked_by_id, where: "locked_by_id IS NOT NULL", name: "index_good_jobs_on_locked_by_id" add_index :good_job_executions, [:process_id, :created_at], name: :index_good_job_executions_on_process_id_and_created_at + add_index :good_job_batches, [:created_at, :id], name: "index_good_job_batches_for_display" + add_index :good_job_batches, [:callbacks_finished_at, :discarded_at], order: { callbacks_finished_at: :asc, discarded_at: "ASC NULLS LAST" }, + where: "(callbacks_finished_at IS NOT NULL)", name: "index_good_job_batches_for_cleanup" end end diff --git a/lib/generators/good_job/templates/update/migrations/02_add_callbacks_finished_at_to_good_job_batches.rb.erb b/lib/generators/good_job/templates/update/migrations/02_add_callbacks_finished_at_to_good_job_batches.rb.erb new file mode 100644 index 000000000..bac68f7b4 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/02_add_callbacks_finished_at_to_good_job_batches.rb.erb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class AddCallbacksFinishedAtToGoodJobBatches < ActiveRecord::Migration<%= migration_version %> + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_job_batches, :callbacks_finished_at) + end + end + + change_table :good_job_batches do |t| + t.datetime :callbacks_finished_at + end + end +end diff --git a/lib/generators/good_job/templates/update/migrations/03_add_indexes_to_good_job_batches.rb.erb b/lib/generators/good_job/templates/update/migrations/03_add_indexes_to_good_job_batches.rb.erb new file mode 100644 index 000000000..c8b7d7287 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/03_add_indexes_to_good_job_batches.rb.erb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class AddIndexesToGoodJobBatches < ActiveRecord::Migration<%= migration_version %> + disable_ddl_transaction! + + def change + reversible do |dir| + dir.up do + return if connection.index_name_exists?(:good_job_batches, :index_good_job_batches_for_cleanup) + end + end + + add_index :good_job_batches, [:created_at, :id], name: "index_good_job_batches_for_display", algorithm: :concurrently + add_index :good_job_batches, [:callbacks_finished_at, :discarded_at], order: { callbacks_finished_at: :asc, discarded_at: "ASC NULLS LAST" }, + where: "(callbacks_finished_at IS NOT NULL)", name: "index_good_job_batches_for_cleanup", algorithm: :concurrently + end +end diff --git a/lib/good_job.rb b/lib/good_job.rb index 72728d925..96fd022e8 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -290,7 +290,7 @@ def self.deprecator # For use in tests/CI to validate GoodJob is up-to-date. # @return [Boolean] def self.migrated? - true + GoodJob::BatchRecord.indexes_migrated? end end diff --git a/spec/app/models/good_job/batch_record_spec.rb b/spec/app/models/good_job/batch_record_spec.rb index a2d002493..9cebe60da 100644 --- a/spec/app/models/good_job/batch_record_spec.rb +++ b/spec/app/models/good_job/batch_record_spec.rb @@ -42,4 +42,35 @@ expect(result.last).to eq last_job end end + + describe 'callbacks_finished_at' do + it 'is set when all callback jobs are finished' do + batch = described_class.create! + batch.update(enqueued_at: Time.current, finished_at: Time.current) + batch.callback_jobs.create!(finished_at: nil) + batch.callback_jobs.create!(finished_at: Time.current) + + batch._continue_discard_or_finish + + expect(batch.reload.callbacks_finished_at).to be_nil + + batch.callback_jobs.update(finished_at: Time.current) + batch._continue_discard_or_finish + + expect(batch.reload.callbacks_finished_at).to be_within(1.second).of(Time.current) + end + end + + describe 'deletion logic' do + it 'checks callbacks_finished_at instead of finished_at' do + batch = described_class.create! + batch.update(enqueued_at: Time.current, finished_at: Time.current, callbacks_finished_at: nil) + + expect { described_class.finished_before(Time.current).delete_all }.not_to change(described_class, :count) + + batch.update(callbacks_finished_at: Time.current) + + expect { described_class.finished_before(Time.current).delete_all }.to change(described_class, :count).by(-1) + end + end end diff --git a/spec/integration/batch_spec.rb b/spec/integration/batch_spec.rb index 91f194562..faeefc15a 100644 --- a/spec/integration/batch_spec.rb +++ b/spec/integration/batch_spec.rb @@ -309,6 +309,7 @@ def perform(_batch, _params) expect(batch).to be_succeeded expect(batch.callback_active_jobs.count).to eq 1 expect(batch.callback_active_jobs.first).to be_a TestJob::SuccessCallbackJob + expect(batch.callbacks_finished_at).to be_present job, callback_job = GoodJob::Job.order(:created_at).to_a expect(job.status).to eq :succeeded @@ -351,4 +352,20 @@ def perform(_batch, _params) expect { batch.retry }.to change { GoodJob::Job.discarded.count }.by(-1) end end + + describe 'batch deletion' do + it 'deletes batches only after their callback jobs have completed' do + batch = GoodJob::Batch.new + batch.on_finish = "BatchCallbackJob" + batch.enqueue do + TestJob.perform_later + end + + GoodJob.perform_inline + + batch.reload + expect(batch).to be_finished + expect(batch.callbacks_finished_at).to be_within(1.second).of(Time.current) + end + end end diff --git a/spec/lib/good_job_spec.rb b/spec/lib/good_job_spec.rb index 3b1f83f8b..d8e2ad2a7 100644 --- a/spec/lib/good_job_spec.rb +++ b/spec/lib/good_job_spec.rb @@ -58,7 +58,7 @@ let!(:old_finished_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago) } let!(:old_finished_job_execution) { GoodJob::Execution.create!(active_job_id: old_finished_job.active_job_id, finished_at: 16.days.ago) } let!(:old_discarded_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago, error: "Error") } - let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 15.days.ago) } + let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 14.days.ago, callbacks_finished_at: 15.days.ago) } it 'deletes finished jobs' do destroyed_records_count = described_class.cleanup_preserved_jobs(in_batches_of: 1) @@ -110,6 +110,16 @@ expect { old_discarded_job.reload }.not_to raise_error expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound end + + it "does not delete batches until their callbacks have finished" do + old_batch.update!(callbacks_finished_at: nil) + described_class.cleanup_preserved_jobs + expect { old_batch.reload }.not_to raise_error + + old_batch.update!(callbacks_finished_at: 15.days.ago) + described_class.cleanup_preserved_jobs + expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound + end end describe '.perform_inline' do