From 4c9a81b821408afe3ce0cd2333e2636fd7de8d1f Mon Sep 17 00:00:00 2001 From: Nick Pezza Date: Sat, 5 Oct 2024 21:00:52 -0400 Subject: [PATCH 1/2] Report errors to the error reporter --- app/models/solid_queue/claimed_execution.rb | 9 +++++---- app/models/solid_queue/process/executor.rb | 4 ++-- app/models/solid_queue/process/prunable.rb | 2 +- lib/solid_queue/supervisor.rb | 2 +- lib/solid_queue/supervisor/maintenance.rb | 3 ++- .../solid_queue/claimed_execution_test.rb | 14 +++++++++----- test/unit/worker_test.rb | 17 +++++++++++++++++ 7 files changed, 37 insertions(+), 14 deletions(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index d4abf45a..ba469a3e 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -36,10 +36,10 @@ def release_all end end - def fail_all_with(error) + def fail_all_with(error, reraise:) SolidQueue.instrument(:fail_many_claimed) do |payload| includes(:job).tap do |executions| - executions.each { |execution| execution.failed_with(error) } + executions.each { |execution| execution.failed_with(error, reraise: reraise) } payload[:process_ids] = executions.map(&:process_id).uniq payload[:job_ids] = executions.map(&:job_id).uniq @@ -63,7 +63,7 @@ def perform if result.success? finished else - failed_with(result.error) + failed_with(result.error, reraise: true) end ensure job.unblock_next_blocked_job @@ -82,11 +82,12 @@ def discard raise UndiscardableError, "Can't discard a job in progress" end - def failed_with(error) + def failed_with(error, reraise:) transaction do job.failed_with(error) destroy! end + raise error if reraise end private diff --git a/app/models/solid_queue/process/executor.rb b/app/models/solid_queue/process/executor.rb index 8dcd12aa..23c41911 100644 --- a/app/models/solid_queue/process/executor.rb +++ b/app/models/solid_queue/process/executor.rb @@ -11,9 +11,9 @@ module Executor after_destroy :release_all_claimed_executions end - def fail_all_claimed_executions_with(error) + def fail_all_claimed_executions_with(error, reraise:) if claims_executions? - claimed_executions.fail_all_with(error) + claimed_executions.fail_all_with(error, reraise: reraise) end end diff --git a/app/models/solid_queue/process/prunable.rb b/app/models/solid_queue/process/prunable.rb index 85341d1d..f116138e 100644 --- a/app/models/solid_queue/process/prunable.rb +++ b/app/models/solid_queue/process/prunable.rb @@ -23,7 +23,7 @@ def prune(excluding: nil) def prune error = Processes::ProcessPrunedError.new(last_heartbeat_at) - fail_all_claimed_executions_with(error) + fail_all_claimed_executions_with(error, reraise: false) deregister(pruned: true) end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 9ef736e4..511886d4 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -173,7 +173,7 @@ def replace_fork(pid, status) def handle_claimed_jobs_by(terminated_fork, status) if registered_process = process.supervisees.find_by(name: terminated_fork.name) error = Processes::ProcessExitError.new(status) - registered_process.fail_all_claimed_executions_with(error) + registered_process.fail_all_claimed_executions_with(error, reraise: false) end end diff --git a/lib/solid_queue/supervisor/maintenance.rb b/lib/solid_queue/supervisor/maintenance.rb index 1b6b5204..3ce8fa53 100644 --- a/lib/solid_queue/supervisor/maintenance.rb +++ b/lib/solid_queue/supervisor/maintenance.rb @@ -29,7 +29,8 @@ def prune_dead_processes def fail_orphaned_executions wrap_in_app_executor do - ClaimedExecution.orphaned.fail_all_with(Processes::ProcessMissingError.new) + ClaimedExecution.orphaned. + fail_all_with(Processes::ProcessMissingError.new, reraise: false) end end end diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 226dad77..04d5cacb 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -22,7 +22,9 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase job = claimed_execution.job assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do - claimed_execution.perform + assert_raises RuntimeError do + claimed_execution.perform + end end assert_not job.reload.finished? @@ -37,10 +39,12 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase test "job failures are reported via Rails error subscriber" do subscriber = ErrorBuffer.new - with_error_subscriber(subscriber) do - claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") + assert_raises RuntimeError do + with_error_subscriber(subscriber) do + claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") - claimed_execution.perform + claimed_execution.perform + end end assert_equal 1, subscriber.errors.count @@ -63,7 +67,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase job = claimed_execution.job assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do - claimed_execution.failed_with(RuntimeError.new) + claimed_execution.failed_with(RuntimeError.new, reraise: false) end assert job.reload.failed? diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index 3523e4a1..09999808 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -67,6 +67,23 @@ class WorkerTest < ActiveSupport::TestCase SolidQueue.on_thread_error = original_on_thread_error end + test "errors on claimed executions are reported via Rails error subscriber" do + subscriber = ErrorBuffer.new + Rails.error.subscribe(subscriber) + + RaisingJob.perform_later(RuntimeError, "B") + + @worker.start + + wait_for_jobs_to_finish_for(1.second) + @worker.wake_up + + assert_equal 1, subscriber.errors.count + assert_equal "This is a RuntimeError exception", subscriber.messages.first + ensure + Rails.error.unsubscribe(subscriber) if Rails.error.respond_to?(:unsubscribe) + end + test "claim and process more enqueued jobs than the pool size allows to process at once" do 5.times do |i| StoreResultJob.perform_later(:paused, pause: 0.1.second) From 29863c988863d3642160d851daa9026de61209bf Mon Sep 17 00:00:00 2001 From: Nick Pezza Date: Wed, 13 Nov 2024 18:52:34 -0500 Subject: [PATCH 2/2] simplify raising --- app/models/solid_queue/claimed_execution.rb | 10 +++++----- app/models/solid_queue/process/executor.rb | 4 ++-- app/models/solid_queue/process/prunable.rb | 2 +- lib/solid_queue/supervisor.rb | 2 +- lib/solid_queue/supervisor/maintenance.rb | 3 +-- test/models/solid_queue/claimed_execution_test.rb | 2 +- 6 files changed, 11 insertions(+), 12 deletions(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index ba469a3e..c2b13909 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -36,10 +36,10 @@ def release_all end end - def fail_all_with(error, reraise:) + def fail_all_with(error) SolidQueue.instrument(:fail_many_claimed) do |payload| includes(:job).tap do |executions| - executions.each { |execution| execution.failed_with(error, reraise: reraise) } + executions.each { |execution| execution.failed_with(error) } payload[:process_ids] = executions.map(&:process_id).uniq payload[:job_ids] = executions.map(&:job_id).uniq @@ -63,7 +63,8 @@ def perform if result.success? finished else - failed_with(result.error, reraise: true) + failed_with(result.error) + raise result.error end ensure job.unblock_next_blocked_job @@ -82,12 +83,11 @@ def discard raise UndiscardableError, "Can't discard a job in progress" end - def failed_with(error, reraise:) + def failed_with(error) transaction do job.failed_with(error) destroy! end - raise error if reraise end private diff --git a/app/models/solid_queue/process/executor.rb b/app/models/solid_queue/process/executor.rb index 23c41911..8dcd12aa 100644 --- a/app/models/solid_queue/process/executor.rb +++ b/app/models/solid_queue/process/executor.rb @@ -11,9 +11,9 @@ module Executor after_destroy :release_all_claimed_executions end - def fail_all_claimed_executions_with(error, reraise:) + def fail_all_claimed_executions_with(error) if claims_executions? - claimed_executions.fail_all_with(error, reraise: reraise) + claimed_executions.fail_all_with(error) end end diff --git a/app/models/solid_queue/process/prunable.rb b/app/models/solid_queue/process/prunable.rb index f116138e..85341d1d 100644 --- a/app/models/solid_queue/process/prunable.rb +++ b/app/models/solid_queue/process/prunable.rb @@ -23,7 +23,7 @@ def prune(excluding: nil) def prune error = Processes::ProcessPrunedError.new(last_heartbeat_at) - fail_all_claimed_executions_with(error, reraise: false) + fail_all_claimed_executions_with(error) deregister(pruned: true) end diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 511886d4..9ef736e4 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -173,7 +173,7 @@ def replace_fork(pid, status) def handle_claimed_jobs_by(terminated_fork, status) if registered_process = process.supervisees.find_by(name: terminated_fork.name) error = Processes::ProcessExitError.new(status) - registered_process.fail_all_claimed_executions_with(error, reraise: false) + registered_process.fail_all_claimed_executions_with(error) end end diff --git a/lib/solid_queue/supervisor/maintenance.rb b/lib/solid_queue/supervisor/maintenance.rb index 3ce8fa53..1b6b5204 100644 --- a/lib/solid_queue/supervisor/maintenance.rb +++ b/lib/solid_queue/supervisor/maintenance.rb @@ -29,8 +29,7 @@ def prune_dead_processes def fail_orphaned_executions wrap_in_app_executor do - ClaimedExecution.orphaned. - fail_all_with(Processes::ProcessMissingError.new, reraise: false) + ClaimedExecution.orphaned.fail_all_with(Processes::ProcessMissingError.new) end end end diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 04d5cacb..4e99fd04 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -67,7 +67,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase job = claimed_execution.job assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do - claimed_execution.failed_with(RuntimeError.new, reraise: false) + claimed_execution.failed_with(RuntimeError.new) end assert job.reload.failed?