Skip to content

Commit

Permalink
Fix handling of stop stopping the stopping task.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Jun 7, 2023
1 parent 01e6bae commit 2230a22
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 4 deletions.
31 changes: 27 additions & 4 deletions lib/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,22 @@ def wait
attr :result

# Stop the task and all of its children.
#
# If `later` is false, it means that `stop` has been invoked directly. When `later` is true, it means that `stop` is invoked by `stop_children` or some other indirect mechanism. In that case, if we encounter the "current" fiber, we can't stop it right away, as it's currently performing `#stop`. Stopping it immediately would interrupt the current stop traversal, so we need to schedule the stop to occur later.
#
# @parameter later [Boolean] Whether to stop the task later, or immediately.
def stop(later = false)
if self.stopped?
# If we already stopped this task... don't try to stop it again:
return
end

# Console.logger.info(self, status:, later:, caller:) {"Stopping task..."}

# If the fiber is alive, we need to stop it:
if @fiber&.alive?
if self.current?
# If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`:
if later
# If the fiber is the current fiber and we want to stop it later, schedule it:
Fiber.scheduler.push(Stop::Later.new(self))
Expand All @@ -240,6 +247,7 @@ def stop(later = false)
else
# If the fiber is not curent, we can raise the exception directly:
begin
# There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later.
Fiber.scheduler.raise(@fiber, Stop)
rescue FiberError
# In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later:
Expand All @@ -266,7 +274,7 @@ def self.current?
end

def current?
self.equal?(Thread.current[:async_task])
Fiber.current.equal?(@fiber)
end

private
Expand Down Expand Up @@ -311,11 +319,26 @@ def failed!(exception = false, propagate = true)
end

def stopped!
# Console.logger.info(self, self.annotation) {"Task was stopped with #{@children&.size.inspect} children!"}
# Console.logger.info(self, status:) {"Task #{self} was stopped with #{@children&.size.inspect} children!"}
@status = :stopped

# We are not running, but children might be so we should stop them:
stop_children(true)
stopped = false

begin
# We are bnot running, but children might be so we should stop them:
stop_children(true)
rescue Stop
if stopped
raise
else
stopped = true
retry
end
end

if stopped
raise Stop, "Stopping current task!"
end
end

def stop!
Expand Down
22 changes: 22 additions & 0 deletions test/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,28 @@ def after
end
end

it "can stop the parent task which stops the stopping task" do
condition = Async::Notification.new

reactor.run do |task|
task.async do
condition.wait
task.stop
end

task.async do
sleep
end

# NOTE: Hangs only if this second task is added
task.async do
sleep
end

condition.signal
end
end

it "should not remove running task" do
top_task = middle_task = bottom_task = nil

Expand Down

0 comments on commit 2230a22

Please sign in to comment.