Skip to content

Commit

Permalink
Don't delete batches until all their callback jobs complete
Browse files Browse the repository at this point in the history
Connects to #1387

Add logic to delete batches only after their callback jobs have completed.
  • Loading branch information
bensheldon committed Aug 29, 2024
1 parent 72ba713 commit 4b5d828
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 15 deletions.
2 changes: 2 additions & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
--color
--order random
--require spec_helper
--require ./spec/support/pre_documentation_formatter.rb \
--format PreDocumentationFormatter
12 changes: 10 additions & 2 deletions app/models/good_job/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ class Batch
:enqueued_at,
:finished_at,
:discarded_at,
:jobs_finished_at,
:enqueued?,
:finished?,
:succeeded?,
:discarded?,
:jobs_finished?,
:description,
:description=,
:on_finish,
Expand Down Expand Up @@ -95,8 +97,12 @@ def enqueue(active_jobs = [], **properties, &block)
record.transaction do
record.with_advisory_lock(function: "pg_advisory_xact_lock") do
record.enqueued_at_will_change!
record.jobs_finished_at_will_change! if GoodJob::BatchRecord.jobs_finished_at_migrated?
record.finished_at_will_change!
record.update!(enqueued_at: nil, finished_at: nil)

update_attributes = { discarded_at: nil, finished_at: nil }
update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated?
record.update!(**update_attributes)
end
end
end
Expand Down Expand Up @@ -142,7 +148,9 @@ def retry
buffer = GoodJob::Adapter::InlineBuffer.capture do
record.transaction do
record.with_advisory_lock(function: "pg_advisory_xact_lock") do
record.update!(discarded_at: nil, finished_at: nil)
update_attributes = { discarded_at: nil, finished_at: nil }
update_attributes[:jobs_finished_at] = nil if GoodJob::BatchRecord.jobs_finished_at_migrated?
record.update!(update_attributes)
record.jobs.discarded.each(&:retry_job)
record._continue_discard_or_finish(lock: false)
end
Expand Down
26 changes: 21 additions & 5 deletions app/models/good_job/batch_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class BatchRecord < BaseRecord
query
end)

def self.jobs_finished_at_migrated?
column_names.include?('jobs_finished_at')
end

# Whether the batch has finished and no jobs were discarded
# @return [Boolean]
def succeeded?
Expand All @@ -52,22 +56,26 @@ def display_attributes
attributes.except('serialized_properties').merge(properties: properties)
end

def _continue_discard_or_finish(execution = nil, lock: true)
execution_discarded = execution && execution.finished_at.present? && execution.error.present?
def _continue_discard_or_finish(job = nil, lock: true)
job_discarded = job && job.finished_at.present? && job.error.present?
buffer = GoodJob::Adapter::InlineBuffer.capture do
advisory_lock_maybe(lock) do
Batch.within_thread(batch_id: nil, batch_callback_id: id) do
reload
if execution_discarded && !discarded_at

if job_discarded && !discarded_at
update(discarded_at: Time.current)
on_discard.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :discard }) if on_discard.present?
end

if enqueued_at && !finished_at && jobs.where(finished_at: nil).count.zero?
update(finished_at: Time.current)
if enqueued_at && !(self.class.jobs_finished_at_migrated? ? jobs_finished_at : finished_at) && jobs.where(finished_at: nil).count.zero?
self.class.jobs_finished_at_migrated? ? update(jobs_finished_at: Time.current) : update(finished_at: Time.current)

on_success.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :success }) if !discarded_at && on_success.present?
on_finish.constantize.set(priority: callback_priority, queue: callback_queue_name).perform_later(to_batch, { event: :finish }) if on_finish.present?
end

update(finished_at: Time.current) if !finished_at && self.class.jobs_finished_at_migrated? && jobs_finished? && callback_jobs.where(finished_at: nil).count.zero?
end
end
end
Expand Down Expand Up @@ -101,6 +109,14 @@ def properties=(value)
self.serialized_properties = value
end

def jobs_finished?
self.class.jobs_finished_at_migrated? ? jobs_finished_at : finished_at
end

def jobs_finished_at
self.class.jobs_finished_at_migrated? ? self[:jobs_finished_at] : self[:finished_at]
end

private

def advisory_lock_maybe(value, &block)
Expand Down
2 changes: 2 additions & 0 deletions app/models/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Job < BaseRecord
set_callback :perform_unlocked, :after, :continue_discard_or_finish_batch

belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true
belongs_to :callback_batch, class_name: 'GoodJob::BatchRecord', foreign_key: :batch_callback_id, inverse_of: :callback_jobs, optional: true
belongs_to :locked_by_process, class_name: "GoodJob::Process", foreign_key: :locked_by_id, inverse_of: :locked_jobs, optional: true
has_many :executions, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', primary_key: "id", inverse_of: :job, dependent: :delete_all

Expand Down Expand Up @@ -743,6 +744,7 @@ def reset_batch_values(&block)

def continue_discard_or_finish_batch
batch._continue_discard_or_finish(self) if batch.present?
callback_batch._continue_discard_or_finish if callback_batch.present?
end

def active_job_data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

class AddJobsFinishedAtToGoodJobBatches < ActiveRecord::Migration[7.1]
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_job_batches, :jobs_finished_at)
end
end

change_table :good_job_batches do |t|
t.datetime :jobs_finished_at
end
end
end
3 changes: 2 additions & 1 deletion demo/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2024_06_13_151310) do
ActiveRecord::Schema.define(version: 2024_08_01_143343) do
# These are extensions that must be enabled in order to support this database
enable_extension "pgcrypto"
enable_extension "plpgsql"
Expand All @@ -28,6 +28,7 @@
t.datetime "enqueued_at"
t.datetime "discarded_at"
t.datetime "finished_at"
t.datetime "jobs_finished_at"
end

create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.datetime :enqueued_at
t.datetime :discarded_at
t.datetime :finished_at
t.datetime :jobs_finished_at
end

create_table :good_job_executions, id: :uuid do |t|
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

class AddJobsFinishedAtToGoodJobBatches < ActiveRecord::Migration<%= migration_version %>
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_job_batches, :jobs_finished_at)
end
end

change_table :good_job_batches do |t|
t.datetime :jobs_finished_at
end
end
end
8 changes: 4 additions & 4 deletions lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ def self.restart(timeout: -1)
_shutdown_all(Capsule.instances, :restart, timeout: timeout)
end

# Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler}, {GoodJob::MultiScheduler}, {GoodJob::CronManager})
# Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler}, {GoodJob::MultiScheduler}, {GoodJob::CronManager}, {GoodJob::SharedExecutor})
# @param executables [Array<Notifier, Poller, Scheduler, MultiScheduler, CronManager, SharedExecutor>] Objects to shut down.
# @param method_name [:symbol] Method to call, e.g. +:shutdown+ or +:restart+.
# @param timeout [nil,Numeric]
# @param method_name [Symbol] Method to call, e.g. +:shutdown+ or +:restart+.
# @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish.
# @param after [Array<Notifier, Poller, Scheduler, MultiScheduler, CronManager, SharedExecutor>] Objects to shut down after initial executables shut down.
# @return [void]
def self._shutdown_all(executables, method_name = :shutdown, timeout: -1, after: [])
Expand Down Expand Up @@ -290,7 +290,7 @@ def self.deprecator
# For use in tests/CI to validate GoodJob is up-to-date.
# @return [Boolean]
def self.migrated?
true
GoodJob::BatchRecord.jobs_finished_at_migrated?
end
end

Expand Down
31 changes: 31 additions & 0 deletions spec/app/models/good_job/batch_record_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,35 @@
expect(result.last).to eq last_job
end
end

describe 'finished_at' do
it 'is now set when all jobs in the batch are finished' do
batch = described_class.create!
batch.update(enqueued_at: Time.current, jobs_finished_at: Time.current)
batch.callback_jobs.create!(finished_at: nil)
batch.callback_jobs.create!(finished_at: Time.current)

batch._continue_discard_or_finish

expect(batch.reload.finished_at).to be_nil

batch.callback_jobs.update(finished_at: Time.current)
batch._continue_discard_or_finish

expect(batch.reload.finished_at).to be_within(1.second).of(Time.current)
end
end

describe 'deletion logic' do
it 'checks finished_at' do
batch = described_class.create!
batch.update(enqueued_at: Time.current, jobs_finished_at: Time.current, finished_at: nil)

expect { described_class.finished_before(Time.current).delete_all }.not_to change(described_class, :count)

batch.update(finished_at: Time.current)

expect { described_class.finished_before(Time.current).delete_all }.to change(described_class, :count).by(-1)
end
end
end
8 changes: 6 additions & 2 deletions spec/app/models/good_job/batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,15 @@ def perform

batch.retry

expect(batch.reload).to be_enqueued
batch.reload
expect(batch).to have_attributes(discarded_at: nil, jobs_finished_at: nil, finished_at: nil)
expect(batch).to be_enqueued

GoodJob.perform_inline

expect(batch.reload).to be_succeeded
batch.reload
expect(batch).to have_attributes(discarded_at: nil, jobs_finished_at: be_present, finished_at: be_present)
expect(batch).to be_succeeded
end
end

Expand Down
18 changes: 18 additions & 0 deletions spec/integration/batch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,10 @@ def perform(_batch, _params)

batch.reload
expect(batch).to be_succeeded
expect(batch).to be_jobs_finished
expect(batch.callback_active_jobs.count).to eq 1
expect(batch.callback_active_jobs.first).to be_a TestJob::SuccessCallbackJob
expect(batch.finished_at).to be_present

job, callback_job = GoodJob::Job.order(:created_at).to_a
expect(job.status).to eq :succeeded
Expand Down Expand Up @@ -351,4 +353,20 @@ def perform(_batch, _params)
expect { batch.retry }.to change { GoodJob::Job.discarded.count }.by(-1)
end
end

describe 'batch deletion' do
it 'deletes batches only after their callback jobs have completed' do
batch = GoodJob::Batch.new
batch.on_finish = "BatchCallbackJob"
batch.enqueue do
TestJob.perform_later
end

GoodJob.perform_inline

batch.reload
expect(batch.jobs_finished_at).to be_present
expect(batch.finished_at).to be_within(1.second).of(Time.current)
end
end
end
12 changes: 11 additions & 1 deletion spec/lib/good_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
let!(:old_finished_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago) }
let!(:old_finished_job_execution) { GoodJob::Execution.create!(active_job_id: old_finished_job.active_job_id, finished_at: 16.days.ago) }
let!(:old_discarded_job) { GoodJob::Job.create!(active_job_id: SecureRandom.uuid, finished_at: 15.days.ago, error: "Error") }
let!(:old_batch) { GoodJob::BatchRecord.create!(finished_at: 15.days.ago) }
let!(:old_batch) { GoodJob::BatchRecord.create!(jobs_finished_at: 14.days.ago, finished_at: 15.days.ago) }

it 'deletes finished jobs' do
destroyed_records_count = described_class.cleanup_preserved_jobs(in_batches_of: 1)
Expand Down Expand Up @@ -110,6 +110,16 @@
expect { old_discarded_job.reload }.not_to raise_error
expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound
end

it "does not delete batches until their callbacks have finished" do
old_batch.update!(finished_at: nil)
described_class.cleanup_preserved_jobs
expect { old_batch.reload }.not_to raise_error

old_batch.update!(finished_at: 15.days.ago)
described_class.cleanup_preserved_jobs
expect { old_batch.reload }.to raise_error ActiveRecord::RecordNotFound
end
end

describe '.perform_inline' do
Expand Down

0 comments on commit 4b5d828

Please sign in to comment.