-
Notifications
You must be signed in to change notification settings - Fork 141
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Use Concurrent::Promises based TimerTask
Promises are the recommended infrastructure, replacing several OG APIs, including TimerTasks. SQ only uses TimerTasks in 3 places (currently) and a very small subset of their overall functionality. SolidQueue::TimerTask is a drop-in replacement. This PR uses AtomicBoolean instead of the recommended Concurrent::Cancellation to avoid a dependency on, and the potential API stability issues of, edge features. This completes the move from the old APIs to Promises and makes all of the new concurrency features (Actors, Channels, etc.) available for future SQ features and enahancements.
- Loading branch information
Showing
6 changed files
with
125 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# frozen_string_literal: true | ||
|
||
module SolidQueue | ||
class TimerTask | ||
include AppExecutor | ||
|
||
def initialize(execution_interval:, run_now: false, &block) | ||
raise ArgumentError, "A block is required" unless block_given? | ||
@shutdown = Concurrent::AtomicBoolean.new | ||
|
||
run(run_now, execution_interval, &block) | ||
end | ||
|
||
def shutdown | ||
@shutdown.make_true | ||
end | ||
|
||
private | ||
|
||
def run(run_now, execution_interval, &block) | ||
execute_task(&block) if run_now | ||
|
||
Concurrent::Promises.future(execution_interval) do |interval| | ||
repeating_task(interval, &block) | ||
end.run | ||
end | ||
|
||
def execute_task(&block) | ||
block.call unless @shutdown.true? | ||
rescue Exception => e | ||
handle_thread_error(e) | ||
end | ||
|
||
def repeating_task(interval, &block) | ||
Concurrent::Promises.schedule(interval) do | ||
execute_task(&block) | ||
end.then do | ||
repeating_task(interval, &block) unless @shutdown.true? | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
# frozen_string_literal: true | ||
|
||
require "test_helper" | ||
require "mocha/minitest" | ||
|
||
class TimerTaskTest < ActiveSupport::TestCase | ||
test "initialization requires a block" do | ||
assert_raises(ArgumentError) do | ||
SolidQueue::TimerTask.new(execution_interval: 1) | ||
end | ||
end | ||
|
||
test "task runs immediate when run now true" do | ||
executed = false | ||
|
||
task = SolidQueue::TimerTask.new(run_now: true, execution_interval: 1) do | ||
executed = true | ||
end | ||
|
||
sleep 0.1 | ||
|
||
assert executed, "Task should have executed immediately" | ||
task.shutdown | ||
end | ||
|
||
test "task does not run immediately when run with run_now false" do | ||
executed = false | ||
|
||
task = SolidQueue::TimerTask.new(run_now: false, execution_interval: 1) do | ||
executed = true | ||
end | ||
|
||
sleep 0.1 | ||
|
||
assert_not executed, "Task should have executed immediately" | ||
task.shutdown | ||
end | ||
|
||
test "task repeats" do | ||
executions = 0 | ||
|
||
task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) do | ||
executions += 1 | ||
end | ||
|
||
sleep(0.5) # Wait to accumulate some executions | ||
|
||
assert executions > 3, "The block should be executed repeatedly" | ||
|
||
task.shutdown | ||
end | ||
|
||
test "task stops on shutdown" do | ||
executions = 0 | ||
|
||
task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) { executions += 1 } | ||
|
||
sleep(0.3) # Let the task run a few times | ||
|
||
task.shutdown | ||
|
||
current_executions = executions | ||
|
||
sleep(0.5) # Ensure no more executions after shutdown | ||
|
||
assert_equal current_executions, executions, "The task should stop executing after shutdown" | ||
end | ||
|
||
test "calls handle_thread_error if task raises" do | ||
task = SolidQueue::TimerTask.new(execution_interval: 0.1) do | ||
raise ExpectedTestError.new | ||
end | ||
task.expects(:handle_thread_error).with(instance_of(ExpectedTestError)) | ||
|
||
sleep(0.2) # Give some time for the task to run and handle the error | ||
|
||
task.shutdown | ||
end | ||
end |