Permanent singleton reactor instance to handle background tasks #268
Unanswered
bbascarevic
asked this question in
Q&A
Replies: 1 comment
-
This is what I came up with, seems to work well with limited testing, needs more tests though. # Thread-safe singleton class that maintains a single `Async::Reactor` per process powered by a singleton thread.
# Designed to work in multi-threaded environments like Puma with threading.
#
# Reactor is lazily created the first time some work is scheduled via the `AsyncExecutor#async` method. This ensures
# that no unnecessary reactor thread is created in e.g. Puma root process that only manages worker processes.
#
# Scheduled work is fire & forget; if you need awaitable tasks use regular `Async { ... }.wait` or `Sync { ... }`.
class AsyncExecutor
# Mutex to synchronize the creation of the reactor and its thread. May not be needed in the presence of GVL, but
# one can never know...
@mutex = Mutex.new
# Per-process-singleton thread that will power the reactor.
@reactor_thread = nil
# PID of the process that created the reactor thread. Used to detect forking, as every forked process needs its
# own reactor thread.
@reactor_thread_owner_pid = nil
# Whether scheduling work for the first time in a process will start the reactor. Exposed as `#auto_start`
# attribute. Set to false in tests so that `#run_until_empty` can be called.
@auto_start = true
# Per-work-item error handler (callable). Thread will die on unhandled work item error unless this is set.
@error_handler = nil
# The queue that will be used for transport of work items from arbitrary threads (e.g. Puma worker threads) to
# the reactor thread. It also enables the reactor thread to suspend if no work is pending because of the blocking
# nature of `Queue#deq`.
@blocking_queue = Queue.new
class << self
attr_accessor :auto_start
attr_reader :error_handler
# Schedules a work item for execution by the reactor. Passes arguments and keyword arguments to the block.
# Returns self.
# Use argument passing instead of closures to prevent race conditions if captured variables change *after* the
# work was scheduled but *before* the work was actually executed by the reactor, as arguments will effectively
# contain copies of values.
def async(*args, **kwargs, &block)
callable = args.empty? && kwargs.empty? ? block : proc { yield(*args, **kwargs) }
@blocking_queue.enq(callable)
maybe_start if @auto_start
self
end
# Stops the reactor and waits on completion of pending continuations. Will not accept new work items afterwards
# as `blocking_queue` would be closed. AsyncExecutor cannot be restarted once explicitly stopped.
def stop
reactor_thread = @reactor_thread
@blocking_queue.close
reactor_thread&.join if thread_created?
ensure
@reactor_thread = nil
@reactor_thread_owner_pid = nil
end
# Runs scheduled work items synchronously on the current thread. Utility method for use in tests along with
# `auto_start = false`. Returns an array of work item results for convenience even though results are ignored
# with regular async execution.
def run_until_empty
raise AssertionError, :reactor_already_running if thread_created?
raise AssertionError, :auto_start_is_set if @auto_start
[].tap do |results|
# `Sync` creates a transient reactor and blocks until it's finished, unless already in a Task in which case
# it just yields.
Sync do
loop do
break unless (work_items = get_work_items(false))
results.push(*work_items.map(&:call))
end
end
end
end
def error_handler=(callable)
raise AssertionError, :error_handler_does_not_respond_to_call if callable && !callable.respond_to?(:call)
@error_handler = callable
end
# Executes the provided block synchronously so fiber scheduler is suspended for the duration of the block.
# Returns the result of the block. Passes any arguments to the block (for signature compatibility with #async,
# even though this runs synchronously+immediately).
#
# The block is executed on a separate transient thread and current fiber locals are copied to the thread.
#
# Use this in fibers to wrap 3rd party code that may not be compatible with fibers.
def sync(*args, **kwargs)
fiber_locals = Thread.current.keys.index_with { |k| Thread.current[k] }
Fiber.blocking do
# Overkill, but the only safe thing to do given that gems could even define new instance vars on the thread
# object... e.g. activesupport-7.0.4.2/lib/active_support/isolated_execution_state.rb#L9
Thread.new(fiber_locals) do |locals|
Thread.current.name="#{name}TransientSyncThread"
locals.each { |k, v| Thread.current[k] = v }
Rails.application.executor.wrap { yield(*args, **kwargs) }
end.join.value
end
end
# Useful in tests to assert that work was en/dequeued.
def pending_count
@blocking_queue.size
end
# Use in code that requires/expects async environment, to guard against being invoked with no fiber scheduler.
def assert_async!
raise AssertionError, :sync_is_prohibited_here unless Fiber.scheduler
end
# Use in code that deals with legacy code (uses true thread locals) to guard against being executed on the
# reactor thread directly.
def assert_sync!
raise AssertionError, :async_is_prohibited_here if Fiber.scheduler
end
private
def thread_created?
# If the PID of the process changed since the thread was created that means that this process was forked, so
# effectively there's no reactor thread running in this process.
@reactor_thread && (Process.pid == @reactor_thread_owner_pid)
end
def maybe_start
# The method will be called every time a work item is enqueued, so first perform a cheap exit-early check.
return if thread_created?
# The method will be called from multiple threads (e.g. Puma worker threads) so we synchronize access to
# ensure that there's only one Reactor thread per process.
@mutex.synchronize do
# After acquiring the mutex check if another thread beat us to it.
return if thread_created?
@reactor_thread_owner_pid = Process.pid
@reactor_thread = Thread.new do
Thread.current.name = "#{name}Thread"
loop do # outer loop
reactor = ::Async::Reactor.new
begin
loop do # reactor loop
# Run the reactor once. This may produce additional Tasks in the reactor (continuations).
# `reactor.run_once` will return `false` if there are no more tasks for the reactor.
reactor_finished = !reactor.run_once(0.5)
if @blocking_queue.closed? # `#stop` was called?
# continue with `reactor.run_once` until the reactor is finished with Tasks.
break if reactor_finished
else
# Get more work from the work queue:
# - If the reactor is finished then block the thread until some work is enqueued.
# - If the reactor is not finished then don't block the thread and only retrieve work items
# queued up so far.
get_work_items(reactor_finished)&.each do |work_item|
::Async::Task.new(reactor) { execute(work_item) }.run
end
end
end # reactor loop
rescue StandardError => e
raise e unless @error_handler
@error_handler.call(e)
ensure
@reactor_thread = nil
@reactor_thread_owner_pid = nil
Fiber.set_scheduler(nil)
end
break if @blocking_queue.closed?
end # outer loop
end
end
end
def get_work_items(blocking)
# `#stop` closes the queue and signals that the reactor should stop and the thread should terminate.
return nil if @blocking_queue.closed?
# If the mode is blocking simply return the next item from the queue. This will block if the queue is empty.
if blocking
work_item = @blocking_queue.deq
return work_item && [work_item] # `work_item` can be nil if the queue was closed while in a blocking wait.
end
# If the mode is non-blocking and there's no work queued up so far return nil.
return nil if @blocking_queue.empty?
# If the mode is non blocking and there's work queued up, return all pending work items as an array. This
# will not block as it's only dequeuing `blocking_queue#size` number of items.
Array.new(@blocking_queue.size) { @blocking_queue.deq }
end
def execute(work_item)
work_item.call
rescue StandardError => e
raise e unless @error_handler
@error_handler.call(e)
end
end
end |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
I have a Ruby 3 & Rails application running on Puma that's configured to use both processes and threads for processing requests. I need in-process, non-persistent, fire&forget background task execution using Fibers.
The idea is to create a dedicated thread for the Async::Reactor when the Rails app starts that will live for the whole duration of the process. The thread would sleep while there are no tasks in the reactor and wake up when work is scheduled. Once work is done, if the execution didn't spawn additional tasks then the thread hosting the reactor goes back to sleep. Puma web server threads can schedule async tasks to run on the reactor and not wait on their scheduling or completion.
Can this be done? If so, are there any caveats or footguns to watch out for?
Also, it's my understanding that when using Async v2 fiber scheduler on Ruby 3, within a Fiber all methods that block on I/O will become non-blocking? If my understanding is correct, can this be suppressed somehow? These background task may need to interact with Rails or some gems that may use thread locals directly, could
Fiber.blocking { blocking code here }
be used for suppressing async and provide compatibility with such legacy code?Beta Was this translation helpful? Give feedback.
All reactions