Skip to content

Commit

Permalink
Add missing documentation and improve documentation consistency.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Nov 8, 2024
1 parent 0b4930d commit ffc19a4
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 37 deletions.
4 changes: 2 additions & 2 deletions lib/async/barrier.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
module Async
# A general purpose synchronisation primitive, which allows one task to wait for a number of other tasks to complete. It can be used in conjunction with {Semaphore}.
#
# @public Since `stable-v1`.
# @public Since *Async v1*.
class Barrier
# Initialize the barrier.
# @parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks.
# @public Since `stable-v1`.
# @public Since *Async v1*.
def initialize(parent: nil)
@tasks = List.new

Expand Down
2 changes: 1 addition & 1 deletion lib/async/clock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

module Async
# A convenient wrapper around the internal monotonic clock.
# @public Since `stable-v1`.
# @public Since *Async v1*.
class Clock
# Get the current elapsed monotonic time.
def self.now
Expand Down
2 changes: 1 addition & 1 deletion lib/async/condition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

module Async
# A synchronization primitive, which allows fibers to wait until a particular condition is (edge) triggered.
# @public Since `stable-v1`.
# @public Since *Async v1*.
class Condition
# Create a new condition.
def initialize
Expand Down
5 changes: 5 additions & 0 deletions lib/async/console.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@ module Async
#
# This is an experimental feature.
module Console
# Log a message at the debug level. The shim is silent.
def self.debug(...)
end

# Log a message at the info level. The shim is silent.
def self.info(...)
end

# Log a message at the warn level. The shim redirects to `Kernel#warn`.
def self.warn(*arguments, exception: nil, **options)
if exception
super(*arguments, exception.full_message, **options)
Expand All @@ -26,10 +29,12 @@ def self.warn(*arguments, exception: nil, **options)
end
end

# Log a message at the error level. The shim redirects to `Kernel#warn`.
def self.error(...)
self.warn(...)
end

# Log a message at the fatal level. The shim redirects to `Kernel#warn`.
def self.fatal(...)
self.warn(...)
end
Expand Down
3 changes: 2 additions & 1 deletion lib/async/idler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ module Async
# A load balancing mechanism that can be used process work when the system is idle.
class Idler
# Create a new idler.
# @public Since `stable-v2`.
#
# @public Since *Async v2*.
#
# @parameter maximum_load [Numeric] The maximum load before we start shedding work.
# @parameter backoff [Numeric] The initial backoff time, used for delaying work.
Expand Down
2 changes: 1 addition & 1 deletion lib/async/notification.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

module Async
# A synchronization primitive, which allows fibers to wait until a notification is received. Does not block the task which signals the notification. Waiting tasks are resumed on next iteration of the reactor.
# @public Since `stable-v1`.
# @public Since *Async v1*.
class Notification < Condition
# Signal to a given task that it should resume operations.
def signal(value = nil, task: Task.current)
Expand Down
4 changes: 2 additions & 2 deletions lib/async/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Async
#
# It has a compatible interface with {Notification} and {Condition}, except that it's multi-value.
#
# @public Since `stable-v1`.
# @public Since *Async v1*.
class Queue
# Create a new queue.
#
Expand Down Expand Up @@ -99,7 +99,7 @@ def wait
end

# A queue which limits the number of items that can be enqueued.
# @public Since `stable-v1`.
# @public Since *Async v1*.
class LimitedQueue < Queue
# Create a new limited queue.
#
Expand Down
116 changes: 93 additions & 23 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ def initialize(message = "Scheduler is closed!")
end

# Whether the fiber scheduler is supported.
# @public Since `stable-v1`.
# @public Since *Async v1*.
def self.supported?
true
end

# Create a new scheduler.
#
# @public Since `stable-v1`.
# @public Since *Async v1*.
# @parameter parent [Node | Nil] The parent node to use for task hierarchy.
# @parameter selector [IO::Event::Selector] The selector to use for event handling.
def initialize(parent = nil, selector: nil)
Expand All @@ -52,6 +52,7 @@ def initialize(parent = nil, selector: nil)
end

# Compute the scheduler load according to the busy and idle times that are updated by the run loop.
#
# @returns [Float] The load of the scheduler. 0.0 means no load, 1.0 means fully loaded or over-loaded.
def load
total_time = @busy_time + @idle_time
Expand Down Expand Up @@ -95,7 +96,7 @@ def terminate
end

# Terminate all child tasks and close the scheduler.
# @public Since `stable-v1`.
# @public Since *Async v1*.
def close
self.run_loop do
until self.terminate
Expand All @@ -115,7 +116,7 @@ def close
end

# @returns [Boolean] Whether the scheduler has been closed.
# @public Since `stable-v1`.
# @public Since *Async v1*.
def closed?
@selector.nil?
end
Expand Down Expand Up @@ -167,6 +168,8 @@ def resume(fiber, *arguments)
end

# Invoked when a fiber tries to perform a blocking operation which cannot continue. A corresponding call {unblock} must be performed to allow this fiber to continue.
#

# @asynchronous May only be called on same thread as fiber scheduler.
def block(blocker, timeout)
# $stderr.puts "block(#{blocker}, #{Fiber.current}, #{timeout})"
Expand All @@ -190,7 +193,13 @@ def block(blocker, timeout)
timer&.cancel!
end

# Unblock a fiber that was previously blocked.
#
# @public Since *Async v2* and *Ruby v3.1*.
# @asynchronous May be called from any thread.
#
# @parameter blocker [Object] The object that was blocking the fiber.
# @parameter fiber [Fiber] The fiber to unblock.
def unblock(blocker, fiber)
# $stderr.puts "unblock(#{blocker}, #{fiber})"

Expand All @@ -201,7 +210,12 @@ def unblock(blocker, fiber)
end
end

# @asynchronous May be non-blocking..
# Sleep for the specified duration.
#
# @public Since *Async v2* and *Ruby v3.1*.
# @asynchronous May be non-blocking.
#
# @parameter duration [Numeric | Nil] The time in seconds to sleep, or if nil, indefinitely.
def kernel_sleep(duration = nil)
if duration
self.block(nil, duration)
Expand All @@ -210,15 +224,19 @@ def kernel_sleep(duration = nil)
end
end

# @asynchronous May be non-blocking..
# Resolve the address of the given hostname.
#
# @public Since *Async v2*.
# @asynchronous May be non-blocking.
#
# @parameter hostname [String] The hostname to resolve.
def address_resolve(hostname)
# On some platforms, hostnames may contain a device-specific suffix (e.g. %en0). We need to strip this before resolving.
# See <https://github.com/socketry/async/issues/180> for more details.
hostname = hostname.split("%", 2).first
::Resolv.getaddresses(hostname)
end


if IO.method_defined?(:timeout)
private def get_timeout(io)
io.timeout
Expand All @@ -229,7 +247,14 @@ def address_resolve(hostname)
end
end

# @asynchronous May be non-blocking..
# Wait for the specified IO to become ready for the specified events.
#
# @public Since *Async v2*.
# @asynchronous May be non-blocking.
#
# @parameter io [IO] The IO object to wait on.
# @parameter events [Integer] The events to wait for, e.g. `IO::READABLE`, `IO::WRITABLE`, etc.
# @parameter timeout [Float | Nil] The maximum time to wait, or if nil, indefinitely.
def io_wait(io, events, timeout = nil)
fiber = Fiber.current

Expand All @@ -251,6 +276,15 @@ def io_wait(io, events, timeout = nil)
end

if ::IO::Event::Support.buffer?
# Read from the specified IO into the buffer.
#
# @public Since *Async v2* and Ruby with `IO::Buffer` support.
# @asynchronous May be non-blocking.
#
# @parameter io [IO] The IO object to read from.
# @parameter buffer [IO::Buffer] The buffer to read into.
# @parameter length [Integer] The minimum number of bytes to read.
# @parameter offset [Integer] The offset within the buffer to read into.
def io_read(io, buffer, length, offset = 0)
fiber = Fiber.current

Expand All @@ -266,6 +300,15 @@ def io_read(io, buffer, length, offset = 0)
end

if RUBY_ENGINE != "ruby" || RUBY_VERSION >= "3.3.1"
# Write the specified buffer to the IO.
#
# @public Since *Async v2* and *Ruby v3.3.1* with `IO::Buffer` support.
# @asynchronous May be non-blocking.
#
# @parameter io [IO] The IO object to write to.
# @parameter buffer [IO::Buffer] The buffer to write from.
# @parameter length [Integer] The minimum number of bytes to write.
# @parameter offset [Integer] The offset within the buffer to write from.
def io_write(io, buffer, length, offset = 0)
fiber = Fiber.current

Expand All @@ -283,6 +326,10 @@ def io_write(io, buffer, length, offset = 0)
end

# Wait for the specified process ID to exit.
#
# @public Since *Async v2*.
# @asynchronous May be non-blocking.
#
# @parameter pid [Integer] The process ID to wait for.
# @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
# @returns [Process::Status] A process status instance.
Expand Down Expand Up @@ -335,7 +382,10 @@ def process_wait(pid, flags)
end

# Run one iteration of the event loop.
# Does not handle interrupts.
#
# @public Since *Async v1*.
# @asynchronous Must be invoked from blocking (root) fiber.
#
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
def run_once(timeout = nil)
Expand All @@ -354,6 +404,7 @@ def run_once(timeout = nil)
end

# Checks and clears the interrupted state of the scheduler.
#
# @returns [Boolean] Whether the reactor has been interrupted.
private def interrupted?
if @interrupted
Expand All @@ -368,7 +419,9 @@ def run_once(timeout = nil)
return false
end

# Stop all children, including transient children, ignoring any signals.
# Stop all children, including transient children.
#
# @public Since *Async v1*.
def stop
@children&.each do |child|
child.stop
Expand All @@ -387,6 +440,7 @@ def stop
end
end
rescue Interrupt => interrupt
# If an interrupt did occur during an iteration of the event loop, we need to handle it. More specifically, `self.stop` is not safe to interrupt without potentially corrupting the task tree.
Thread.handle_interrupt(::SignalException => :never) do
Console.debug(self) do |buffer|
buffer.puts "Scheduler interrupted: #{interrupt.inspect}"
Expand All @@ -406,6 +460,13 @@ def stop
end

# Run the reactor until all tasks are finished. Proxies arguments to {#async} immediately before entering the loop, if a block is provided.
#
# Forwards all parameters to {#async} if a block is given.
#
# @public Since *Async v1*.
#
# @yields {|task| ...} The top level task, if a block is given.
# @returns [Task] The initial task that was scheduled into the reactor.
def run(...)
Kernel.raise ClosedError if @selector.nil?

Expand All @@ -418,30 +479,23 @@ def run(...)
return initial_task
end

# Start an asynchronous task within the specified reactor. The task will be
# executed until the first blocking call, at which point it will yield and
# and this method will return.
# Start an asynchronous task within the specified reactor. The task will be executed until the first blocking call, at which point it will yield and and this method will return.
#
# This is the main entry point for scheduling asynchronus tasks.
# @public Since *Async v1*.
# @asynchronous May context switch immediately to new task.
# @deprecated Use {#run} or {Task#async} instead.
#
# @yields {|task| ...} Executed within the task.
# @returns [Task] The task that was scheduled into the reactor.
# @deprecated With no replacement.
def async(*arguments, **options, &block)
# warn "Async::Scheduler#async is deprecated. Use `run` or `Task#async` instead.", uplevel: 1, category: :deprecated

Kernel.raise ClosedError if @selector.nil?

task = Task.new(Task.current? || self, **options, &block)

# I want to take a moment to explain the logic of this.
# When calling an async block, we deterministically execute it until the
# first blocking operation. We don't *have* to do this - we could schedule
# it for later execution, but it's useful to:
# - Fail at the point of the method call where possible.
# - Execute determinstically where possible.
# - Avoid scheduler overhead if no blocking operation is performed.
task.run(*arguments)

# Console.debug "Initial execution of task #{fiber} complete (#{result} -> #{fiber.alive?})..."
return task
end

Expand All @@ -450,7 +504,14 @@ def fiber(...)
end

# Invoke the block, but after the specified timeout, raise {TimeoutError} in any currenly blocking operation. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
#
# @public Since *Async v1*.
# @asynchronous May raise an exception at any interruption point (e.g. blocking operations).
#
# @parameter duration [Numeric] The time in seconds, in which the task should complete.
# @parameter exception [Class] The exception class to raise.
# @parameter message [String] The message to pass to the exception.
# @yields {|duration| ...} The block to execute with a timeout.
def with_timeout(duration, exception = TimeoutError, message = "execution expired", &block)
fiber = Fiber.current

Expand All @@ -465,6 +526,15 @@ def with_timeout(duration, exception = TimeoutError, message = "execution expire
timer&.cancel!
end

# Invoke the block, but after the specified timeout, raise the specified exception with the given message. If the block runs to completion before the timeout occurs or there are no non-blocking operations after the timeout expires, the code will complete without any exception.
#
# @public Since *Async v1* and *Ruby v3.1*. May be invoked from `Timeout.timeout`.
# @asynchronous May raise an exception at any interruption point (e.g. blocking operations).
#
# @parameter duration [Numeric] The time in seconds, in which the task should complete.
# @parameter exception [Class] The exception class to raise.
# @parameter message [String] The message to pass to the exception.
# @yields {|duration| ...} The block to execute with a timeout.
def timeout_after(duration, exception, message, &block)
with_timeout(duration, exception, message) do |timer|
yield duration
Expand Down
2 changes: 1 addition & 1 deletion lib/async/semaphore.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

module Async
# A synchronization primitive, which limits access to a given resource.
# @public Since `stable-v1`.
# @public Since *Async v1*.
class Semaphore
# @parameter limit [Integer] The maximum number of times the semaphore can be acquired before it blocks.
# @parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks.
Expand Down
Loading

0 comments on commit ffc19a4

Please sign in to comment.