diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index c2b13909..cdffb489 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -5,12 +5,6 @@ class SolidQueue::ClaimedExecution < SolidQueue::Execution scope :orphaned, -> { where.missing(:process) } - class Result < Struct.new(:success, :error) - def success? - success - end - end - class << self def claiming(job_ids, process_id, &block) job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } } @@ -60,12 +54,9 @@ def discard_all_from_jobs(*) def perform result = execute - if result.success? - finished - else - failed_with(result.error) - raise result.error - end + result.just? ? finished : failed_with(result.reason) + + result ensure job.unblock_next_blocked_job end @@ -93,9 +84,9 @@ def failed_with(error) private def execute ActiveJob::Base.execute(job.arguments) - Result.new(true, nil) + Concurrent::Maybe.just(true) rescue Exception => e - Result.new(false, e) + Concurrent::Maybe.nothing(e) end def finished diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e7070d26..4715136c 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -40,6 +40,7 @@ module SolidQueue mattr_accessor :preserve_finished_jobs, default: true mattr_accessor :clear_finished_jobs_after, default: 1.day mattr_accessor :default_concurrency_control_period, default: 3.minutes + mattr_accessor :reporting_label, default: "SolidQueue-#{SolidQueue::VERSION}" delegate :on_start, :on_stop, to: Supervisor diff --git a/lib/solid_queue/app_executor.rb b/lib/solid_queue/app_executor.rb index da0976fe..3873ac03 100644 --- a/lib/solid_queue/app_executor.rb +++ b/lib/solid_queue/app_executor.rb @@ -11,11 +11,38 @@ def wrap_in_app_executor(&block) end def handle_thread_error(error) - SolidQueue.instrument(:thread_error, error: error) + CallErrorReporters.new(error).call + end + + private + + # Handles error reporting and guarantees that Rails.error will be called if configured. + # + # This method performs the following actions: + # 1. Invokes `SolidQueue.instrument` for `:thread_error`. + # 2. Invokes `SolidQueue.on_thread_error` if it is configured. + # 3. Invokes `Rails.error.report` if it wasn't invoked by one of the above calls. + class CallErrorReporters + # @param [Exception] error The error to be reported. + def initialize(error) + @error = error + @reported = false + end + + def call + SolidQueue.instrument(:thread_error, error: @error) + Rails.error.subscribe(self) if Rails.error&.respond_to?(:subscribe) - if SolidQueue.on_thread_error - SolidQueue.on_thread_error.call(error) + SolidQueue.on_thread_error&.call(@error) + + Rails.error.report(@error, handled: false, source: SolidQueue.reporting_label) unless @reported + ensure + Rails.error.unsubscribe(self) if Rails.error&.respond_to?(:unsubscribe) + end + + def report(*, **) + @reported = true + end end - end end end diff --git a/lib/solid_queue/dispatcher/concurrency_maintenance.rb b/lib/solid_queue/dispatcher/concurrency_maintenance.rb index 81cf770c..fd0073d4 100644 --- a/lib/solid_queue/dispatcher/concurrency_maintenance.rb +++ b/lib/solid_queue/dispatcher/concurrency_maintenance.rb @@ -12,16 +12,10 @@ def initialize(interval, batch_size) end def start - @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: interval) do + @concurrency_maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: interval) do expire_semaphores unblock_blocked_executions end - - @concurrency_maintenance_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @concurrency_maintenance_task.execute end def stop diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index d10997c7..7d478646 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -18,7 +18,7 @@ class Engine < ::Rails::Engine initializer "solid_queue.app_executor", before: :run_prepare_callbacks do |app| config.solid_queue.app_executor ||= app.executor - config.solid_queue.on_thread_error ||= ->(exception) { Rails.error.report(exception, handled: false) } + config.solid_queue.on_thread_error ||= ->(exception) { Rails.error.report(exception, handled: false, source: SolidQueue.reporting_label) } SolidQueue.app_executor = config.solid_queue.app_executor SolidQueue.on_thread_error = config.solid_queue.on_thread_error diff --git a/lib/solid_queue/log_subscriber.rb b/lib/solid_queue/log_subscriber.rb index 3d2ec02c..f17cf9cb 100644 --- a/lib/solid_queue/log_subscriber.rb +++ b/lib/solid_queue/log_subscriber.rb @@ -162,7 +162,7 @@ def replace_fork(event) private def formatted_event(event, action:, **attributes) - "SolidQueue-#{SolidQueue::VERSION} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}" + "#{SolidQueue.reporting_label} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}" end def formatted_attributes(**attributes) diff --git a/lib/solid_queue/pool.rb b/lib/solid_queue/pool.rb index c1bcf195..a703c3d7 100644 --- a/lib/solid_queue/pool.rb +++ b/lib/solid_queue/pool.rb @@ -18,20 +18,16 @@ def initialize(size, on_idle: nil) def post(execution) available_threads.decrement - future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution| + Concurrent::Promises.future_on(executor, execution) do |thread_execution| wrap_in_app_executor do - thread_execution.perform + result = thread_execution.perform + + handle_thread_error(result.reason) if result.rejected? ensure available_threads.increment mutex.synchronize { on_idle.try(:call) if idle? } end end - - future.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - future.execute end def idle_threads diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index 58cabfa8..9c8e33cf 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -37,15 +37,9 @@ def registered? end def launch_heartbeat - @heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do + @heartbeat_task = SolidQueue::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do wrap_in_app_executor { heartbeat } end - - @heartbeat_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @heartbeat_task.execute end def stop_heartbeat diff --git a/lib/solid_queue/supervisor/maintenance.rb b/lib/solid_queue/supervisor/maintenance.rb index 1b6b5204..48808d7c 100644 --- a/lib/solid_queue/supervisor/maintenance.rb +++ b/lib/solid_queue/supervisor/maintenance.rb @@ -7,16 +7,11 @@ module Supervisor::Maintenance end private + def launch_maintenance_task - @maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do + @maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do prune_dead_processes end - - @maintenance_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @maintenance_task.execute end def stop_maintenance_task diff --git a/lib/solid_queue/timer_task.rb b/lib/solid_queue/timer_task.rb new file mode 100644 index 00000000..9a3fcec4 --- /dev/null +++ b/lib/solid_queue/timer_task.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module SolidQueue + class TimerTask + include AppExecutor + + def initialize(execution_interval:, run_now: false, &block) + raise ArgumentError, "A block is required" unless block_given? + @shutdown = Concurrent::AtomicBoolean.new + + run(run_now, execution_interval, &block) + end + + def shutdown + @shutdown.make_true + end + + private + + def run(run_now, execution_interval, &block) + execute_task(&block) if run_now + + Concurrent::Promises.future(execution_interval) do |interval| + repeating_task(interval, &block) + end.run + end + + def execute_task(&block) + block.call unless @shutdown.true? + rescue Exception => e + handle_thread_error(e) + end + + def repeating_task(interval, &block) + Concurrent::Promises.schedule(interval) do + execute_task(&block) + end.then do + repeating_task(interval, &block) unless @shutdown.true? + end + end + end +end diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 59443ccf..ec8913d1 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -391,9 +391,10 @@ class InstrumentationTest < ActiveSupport::TestCase test "thread errors emit thread_error events" do previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false - error = ExpectedTestError.new("everything is broken") - SolidQueue::ClaimedExecution::Result.expects(:new).raises(error).at_least_once + + # Allows the job to process normally, but trigger the error path in ClaimedExecution.execute + Concurrent::Maybe.expects(:just).returns(Concurrent::Maybe.nothing(error)) AddToBufferJob.perform_later "hey!" diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 4e99fd04..226dad77 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -22,9 +22,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase job = claimed_execution.job assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do - assert_raises RuntimeError do - claimed_execution.perform - end + claimed_execution.perform end assert_not job.reload.finished? @@ -39,12 +37,10 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase test "job failures are reported via Rails error subscriber" do subscriber = ErrorBuffer.new - assert_raises RuntimeError do - with_error_subscriber(subscriber) do - claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") + with_error_subscriber(subscriber) do + claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B") - claimed_execution.perform - end + claimed_execution.perform end assert_equal 1, subscriber.errors.count diff --git a/test/unit/timer_task_test.rb b/test/unit/timer_task_test.rb new file mode 100644 index 00000000..b3a9b11b --- /dev/null +++ b/test/unit/timer_task_test.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +require "test_helper" +require "mocha/minitest" + +class TimerTaskTest < ActiveSupport::TestCase + test "initialization requires a block" do + assert_raises(ArgumentError) do + SolidQueue::TimerTask.new(execution_interval: 1) + end + end + + test "task runs immediate when run now true" do + executed = false + + task = SolidQueue::TimerTask.new(run_now: true, execution_interval: 1) do + executed = true + end + + sleep 0.1 + + assert executed, "Task should have executed immediately" + task.shutdown + end + + test "task does not run immediately when run with run_now false" do + executed = false + + task = SolidQueue::TimerTask.new(run_now: false, execution_interval: 1) do + executed = true + end + + sleep 0.1 + + assert_not executed, "Task should have executed immediately" + task.shutdown + end + + test "task repeats" do + executions = 0 + + task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) do + executions += 1 + end + + sleep(0.5) # Wait to accumulate some executions + + assert executions > 3, "The block should be executed repeatedly" + + task.shutdown + end + + test "task stops on shutdown" do + executions = 0 + + task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) { executions += 1 } + + sleep(0.3) # Let the task run a few times + + task.shutdown + + current_executions = executions + + sleep(0.5) # Ensure no more executions after shutdown + + assert_equal current_executions, executions, "The task should stop executing after shutdown" + end + + test "calls handle_thread_error if task raises" do + task = SolidQueue::TimerTask.new(execution_interval: 0.1) do + raise ExpectedTestError.new + end + task.expects(:handle_thread_error).with(instance_of(ExpectedTestError)) + + sleep(0.2) # Give some time for the task to run and handle the error + + task.shutdown + end +end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index d511cf74..f0b8f5fc 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -51,7 +51,7 @@ class WorkerTest < ActiveSupport::TestCase subscriber = ErrorBuffer.new Rails.error.subscribe(subscriber) - SolidQueue::ClaimedExecution::Result.expects(:new).raises(ExpectedTestError.new("everything is broken")).at_least_once + Concurrent::Maybe.expects(:just).returns(Concurrent::Maybe.nothing(ExpectedTestError.new("everything is broken"))) AddToBufferJob.perform_later "hey!"