From 32b2ed8c2789c9fa4f1d930848fb49b667fea712 Mon Sep 17 00:00:00 2001 From: "Hal M. Spitz" Date: Tue, 3 Dec 2024 14:35:05 -0800 Subject: [PATCH] Specialize Dispatcher and Worker looping The Worker and Dispatcher share the same poll loop logic (Poller#start_loop) while having different functional requirements. The Worker is poll looping despite not being able to execute new jobs if at capacity. The Dispatcher does require polling, but is reliant on shared logic in Poller#start_loop for a Dispatcher specific optimization. Changes: Move the logic controlling the sleep interval per poll from Poller#start_loop into Worker#poll and Dispatcher#poll by requiring #poll to return the `delay` value passed into interruptible_sleep. Poller#start_loop: * Removes the test based on the number of rows processed by #poll. This was Dispatcher specific logic. Worker#poll: * When Worker at full capacity: return a large value (10.minutes) effectively transforming Poller#start_loop from polling to wake-on-event. * When Worker < capacity: return `polling_interval` and maintain the poll timing until ReadyExecutions become available. Dispatcher#poll: * When `due` ScheduledExecutions.zero? return `polling_interval` and maintain the existing poll timing when no ScheduledExecutions are available to process. * When `due` ScheduledExecutions.postive? return 0. This results in interruptible_sleep(0) which returns immediately and without introducing any delays/sleeps between polls. This also allows for the existing behavior of looping through ScheduledExecutions via poll in order to check for shutdown requests between dispatch_next_batch interations. --- lib/solid_queue/dispatcher.rb | 3 ++- lib/solid_queue/processes/interruptible.rb | 14 ++++++++----- lib/solid_queue/processes/poller.rb | 8 ++++---- lib/solid_queue/worker.rb | 3 ++- test/unit/dispatcher_test.rb | 24 ++++++++++++++++++++++ test/unit/worker_test.rb | 20 ++++++++++++++++++ 6 files changed, 61 insertions(+), 11 deletions(-) diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index a22a82d8..fb988075 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -24,7 +24,8 @@ def metadata private def poll batch = dispatch_next_batch - batch.size + + batch.size.zero? ? polling_interval : 0.seconds end def dispatch_next_batch diff --git a/lib/solid_queue/processes/interruptible.rb b/lib/solid_queue/processes/interruptible.rb index 09c027b6..3bff1dd9 100644 --- a/lib/solid_queue/processes/interruptible.rb +++ b/lib/solid_queue/processes/interruptible.rb @@ -12,13 +12,17 @@ def interrupt queue << true end + # Sleeps for 'time'. Can be interrupted asynchronously and return early via wake_up. + # @param time [Numeric] the time to sleep. 0 returns immediately. + # @return [true, nil] + # * returns `true` if an interrupt was requested via #wake_up between the + # last call to `interruptible_sleep` and now, resulting in an early return. + # * returns `nil` if it slept the full `time` and was not interrupted. def interruptible_sleep(time) - # Invoking from the main thread can result in a 35% slowdown (at least when running the test suite). - # Using some form of Async (Futures) addresses this performance issue. + # Invoking this from the main thread may result in significant slowdown. + # Utilizing asynchronous execution (Futures) addresses this performance issue. Concurrent::Promises.future(time) do |timeout| - if timeout > 0 && queue.pop(timeout:) - queue.clear - end + queue.pop(timeout:).tap { queue.clear } end.value end diff --git a/lib/solid_queue/processes/poller.rb b/lib/solid_queue/processes/poller.rb index bf5a7450..75df6104 100644 --- a/lib/solid_queue/processes/poller.rb +++ b/lib/solid_queue/processes/poller.rb @@ -25,11 +25,11 @@ def start_loop loop do break if shutting_down? - wrap_in_app_executor do - unless poll > 0 - interruptible_sleep(polling_interval) - end + delay = wrap_in_app_executor do + poll end + + interruptible_sleep(delay) end ensure SolidQueue.instrument(:shutdown_process, process: self) do diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index fc203774..f34a14f0 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -7,6 +7,7 @@ class Worker < Processes::Poller after_boot :run_start_hooks before_shutdown :run_stop_hooks + attr_accessor :queues, :pool def initialize(**options) @@ -29,7 +30,7 @@ def poll pool.post(execution) end - executions.size + pool.idle? ? polling_interval : 10.minutes end end diff --git a/test/unit/dispatcher_test.rb b/test/unit/dispatcher_test.rb index 42d57c92..5bca7743 100644 --- a/test/unit/dispatcher_test.rb +++ b/test/unit/dispatcher_test.rb @@ -92,6 +92,30 @@ class DispatcherTest < ActiveSupport::TestCase another_dispatcher&.stop end + test "sleeps `0.seconds` between polls if there are ready to dispatch jobs" do + dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1) + dispatcher.expects(:interruptible_sleep).with(0.seconds).at_least(3) + dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once + + 3.times { AddToBufferJob.set(wait: 0.1).perform_later("I'm scheduled") } + assert_equal 3, SolidQueue::ScheduledExecution.count + sleep 0.1 + + dispatcher.start + wait_while_with_timeout(1.second) { SolidQueue::ScheduledExecution.any? } + + assert_equal 0, SolidQueue::ScheduledExecution.count + assert_equal 3, SolidQueue::ReadyExecution.count + end + + test "sleeps `polling_interval` between polls if there are no un-dispatched jobs" do + dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1) + dispatcher.expects(:interruptible_sleep).with(0.seconds).never + dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once + dispatcher.start + sleep 0.1 + end + private def with_polling(silence:) old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index d511cf74..52b0d8e8 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -171,6 +171,26 @@ class WorkerTest < ActiveSupport::TestCase SolidQueue.process_heartbeat_interval = old_heartbeat_interval end + test "sleeps `10.minutes` if at capacity" do + 3.times { |i| StoreResultJob.perform_later(i, pause: 1.second) } + + @worker.expects(:interruptible_sleep).with(10.minutes).at_least_once + @worker.expects(:interruptible_sleep).with(@worker.polling_interval).never + + @worker.start + sleep 1.second + end + + test "sleeps `polling_interval` if worker not at capacity" do + 2.times { |i| StoreResultJob.perform_later(i, pause: 1.second) } + + @worker.expects(:interruptible_sleep).with(@worker.polling_interval).at_least_once + @worker.expects(:interruptible_sleep).with(10.minutes).never + + @worker.start + sleep 1.second + end + private def with_polling(silence:) old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence