From 43f4de8d63cc889407f4bc10d5be7d8e996bae87 Mon Sep 17 00:00:00 2001 From: "Hal M. Spitz" Date: Sun, 24 Nov 2024 14:35:10 -0800 Subject: [PATCH] Use Concurrent::Promises based TimerTask Promises are the recommended infrastructure, replacing several OG APIs, including TimerTasks. SQ only uses TimerTasks in 3 places (currently) and a very small subset of their overall functionality. SolidQueue::TimerTask is a drop-in replacement. This PR uses AtomicBoolean instead of the recommended Concurrent::Cancellation to avoid a dependency on, and the potential API stability issues of, edge features. This completes the move from the old APIs to Promises and makes all of the new concurrency features (Actors, Channels, etc.) available for future SQ features and enahancements. --- Gemfile.lock | 2 - .../dispatcher/concurrency_maintenance.rb | 8 +- lib/solid_queue/processes/registrable.rb | 8 +- lib/solid_queue/supervisor/maintenance.rb | 9 +-- lib/solid_queue/timer_task.rb | 42 ++++++++++ test/unit/timer_task_test.rb | 79 +++++++++++++++++++ 6 files changed, 125 insertions(+), 23 deletions(-) create mode 100644 lib/solid_queue/timer_task.rb create mode 100644 test/unit/timer_task_test.rb diff --git a/Gemfile.lock b/Gemfile.lock index 79544594..2df57956 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -130,8 +130,6 @@ GEM rake (13.2.1) rdoc (6.8.1) psych (>= 4.0.0) - rdoc (6.6.3.1) - psych (>= 4.0.0) regexp_parser (2.9.2) reline (0.5.12) io-console (~> 0.5) diff --git a/lib/solid_queue/dispatcher/concurrency_maintenance.rb b/lib/solid_queue/dispatcher/concurrency_maintenance.rb index 81cf770c..fd0073d4 100644 --- a/lib/solid_queue/dispatcher/concurrency_maintenance.rb +++ b/lib/solid_queue/dispatcher/concurrency_maintenance.rb @@ -12,16 +12,10 @@ def initialize(interval, batch_size) end def start - @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: interval) do + @concurrency_maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: interval) do expire_semaphores unblock_blocked_executions end - - @concurrency_maintenance_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @concurrency_maintenance_task.execute end def stop diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index 58cabfa8..9c8e33cf 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -37,15 +37,9 @@ def registered? end def launch_heartbeat - @heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do + @heartbeat_task = SolidQueue::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do wrap_in_app_executor { heartbeat } end - - @heartbeat_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @heartbeat_task.execute end def stop_heartbeat diff --git a/lib/solid_queue/supervisor/maintenance.rb b/lib/solid_queue/supervisor/maintenance.rb index 1b6b5204..48808d7c 100644 --- a/lib/solid_queue/supervisor/maintenance.rb +++ b/lib/solid_queue/supervisor/maintenance.rb @@ -7,16 +7,11 @@ module Supervisor::Maintenance end private + def launch_maintenance_task - @maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do + @maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do prune_dead_processes end - - @maintenance_task.add_observer do |_, _, error| - handle_thread_error(error) if error - end - - @maintenance_task.execute end def stop_maintenance_task diff --git a/lib/solid_queue/timer_task.rb b/lib/solid_queue/timer_task.rb new file mode 100644 index 00000000..9a3fcec4 --- /dev/null +++ b/lib/solid_queue/timer_task.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module SolidQueue + class TimerTask + include AppExecutor + + def initialize(execution_interval:, run_now: false, &block) + raise ArgumentError, "A block is required" unless block_given? + @shutdown = Concurrent::AtomicBoolean.new + + run(run_now, execution_interval, &block) + end + + def shutdown + @shutdown.make_true + end + + private + + def run(run_now, execution_interval, &block) + execute_task(&block) if run_now + + Concurrent::Promises.future(execution_interval) do |interval| + repeating_task(interval, &block) + end.run + end + + def execute_task(&block) + block.call unless @shutdown.true? + rescue Exception => e + handle_thread_error(e) + end + + def repeating_task(interval, &block) + Concurrent::Promises.schedule(interval) do + execute_task(&block) + end.then do + repeating_task(interval, &block) unless @shutdown.true? + end + end + end +end diff --git a/test/unit/timer_task_test.rb b/test/unit/timer_task_test.rb new file mode 100644 index 00000000..b3a9b11b --- /dev/null +++ b/test/unit/timer_task_test.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +require "test_helper" +require "mocha/minitest" + +class TimerTaskTest < ActiveSupport::TestCase + test "initialization requires a block" do + assert_raises(ArgumentError) do + SolidQueue::TimerTask.new(execution_interval: 1) + end + end + + test "task runs immediate when run now true" do + executed = false + + task = SolidQueue::TimerTask.new(run_now: true, execution_interval: 1) do + executed = true + end + + sleep 0.1 + + assert executed, "Task should have executed immediately" + task.shutdown + end + + test "task does not run immediately when run with run_now false" do + executed = false + + task = SolidQueue::TimerTask.new(run_now: false, execution_interval: 1) do + executed = true + end + + sleep 0.1 + + assert_not executed, "Task should have executed immediately" + task.shutdown + end + + test "task repeats" do + executions = 0 + + task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) do + executions += 1 + end + + sleep(0.5) # Wait to accumulate some executions + + assert executions > 3, "The block should be executed repeatedly" + + task.shutdown + end + + test "task stops on shutdown" do + executions = 0 + + task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) { executions += 1 } + + sleep(0.3) # Let the task run a few times + + task.shutdown + + current_executions = executions + + sleep(0.5) # Ensure no more executions after shutdown + + assert_equal current_executions, executions, "The task should stop executing after shutdown" + end + + test "calls handle_thread_error if task raises" do + task = SolidQueue::TimerTask.new(execution_interval: 0.1) do + raise ExpectedTestError.new + end + task.expects(:handle_thread_error).with(instance_of(ExpectedTestError)) + + sleep(0.2) # Give some time for the task to run and handle the error + + task.shutdown + end +end