Skip to content

Commit

Permalink
Fix handling of stop stopping the stopping task.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Jun 7, 2023
1 parent 01e6bae commit 69f290e
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 4 deletions.
26 changes: 22 additions & 4 deletions lib/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ def wait
attr :result

# Stop the task and all of its children.
#
# If `later` is false, it means that `stop` has been invoked directly. When `later` is true, it means that `stop` is invoked by `stop_children` or some other indirect mechanism. In that case, if we encounter the "current" fiber, we can't stop it right away, as it's currently performing `#stop`. Stopping it immediately would interrupt the current stop traversal, so we need to schedule the stop to occur later.
#
# @parameter later [Boolean] Whether to stop the task later, or immediately.
def stop(later = false)
if self.stopped?
# If we already stopped this task... don't try to stop it again:
Expand All @@ -230,6 +234,7 @@ def stop(later = false)
# If the fiber is alive, we need to stop it:
if @fiber&.alive?
if self.current?
# If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`:
if later
# If the fiber is the current fiber and we want to stop it later, schedule it:
Fiber.scheduler.push(Stop::Later.new(self))
Expand All @@ -240,6 +245,7 @@ def stop(later = false)
else
# If the fiber is not curent, we can raise the exception directly:
begin
# There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later.
Fiber.scheduler.raise(@fiber, Stop)
rescue FiberError
# In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later:
Expand All @@ -266,7 +272,7 @@ def self.current?
end

def current?
self.equal?(Thread.current[:async_task])
Fiber.current.equal?(@fiber)
end

private
Expand Down Expand Up @@ -311,11 +317,23 @@ def failed!(exception = false, propagate = true)
end

def stopped!
# Console.logger.info(self, self.annotation) {"Task was stopped with #{@children&.size.inspect} children!"}
# Console.logger.info(self, status:) {"Task #{self} was stopped with #{@children&.size.inspect} children!"}
@status = :stopped

# We are not running, but children might be so we should stop them:
stop_children(true)
stopped = false

begin
# We are bnot running, but children might be so we should stop them:
stop_children(true)
rescue Stop
stopped = true
# If we are stopping children, and one of them tries to stop the current task, we should ignore it. We will be stopped later.
retry
end

if stopped
raise Stop, "Stopping current task!"
end
end

def stop!
Expand Down
22 changes: 22 additions & 0 deletions test/async/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,28 @@ def after
end
end

it "can stop the parent task which stops the stopping task" do
condition = Async::Notification.new

reactor.run do |task|
task.async do
condition.wait
task.stop
end

task.async do
sleep
end

# NOTE: Hangs only if this second task is added
task.async do
sleep
end

condition.signal
end
end

it "should not remove running task" do
top_task = middle_task = bottom_task = nil

Expand Down

0 comments on commit 69f290e

Please sign in to comment.