From 69f290e28802587a045c5aa409f259f473d9de32 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Wed, 7 Jun 2023 16:19:42 +0900 Subject: [PATCH] Fix handling of stop stopping the stopping task. --- lib/async/task.rb | 26 ++++++++++++++++++++++---- test/async/task.rb | 22 ++++++++++++++++++++++ 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/lib/async/task.rb b/lib/async/task.rb index a7dbf9cd..973869a5 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -221,6 +221,10 @@ def wait attr :result # Stop the task and all of its children. + # + # If `later` is false, it means that `stop` has been invoked directly. When `later` is true, it means that `stop` is invoked by `stop_children` or some other indirect mechanism. In that case, if we encounter the "current" fiber, we can't stop it right away, as it's currently performing `#stop`. Stopping it immediately would interrupt the current stop traversal, so we need to schedule the stop to occur later. + # + # @parameter later [Boolean] Whether to stop the task later, or immediately. def stop(later = false) if self.stopped? # If we already stopped this task... don't try to stop it again: @@ -230,6 +234,7 @@ def stop(later = false) # If the fiber is alive, we need to stop it: if @fiber&.alive? if self.current? + # If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`: if later # If the fiber is the current fiber and we want to stop it later, schedule it: Fiber.scheduler.push(Stop::Later.new(self)) @@ -240,6 +245,7 @@ def stop(later = false) else # If the fiber is not curent, we can raise the exception directly: begin + # There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later. Fiber.scheduler.raise(@fiber, Stop) rescue FiberError # In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later: @@ -266,7 +272,7 @@ def self.current? end def current? - self.equal?(Thread.current[:async_task]) + Fiber.current.equal?(@fiber) end private @@ -311,11 +317,23 @@ def failed!(exception = false, propagate = true) end def stopped! - # Console.logger.info(self, self.annotation) {"Task was stopped with #{@children&.size.inspect} children!"} + # Console.logger.info(self, status:) {"Task #{self} was stopped with #{@children&.size.inspect} children!"} @status = :stopped - # We are not running, but children might be so we should stop them: - stop_children(true) + stopped = false + + begin + # We are bnot running, but children might be so we should stop them: + stop_children(true) + rescue Stop + stopped = true + # If we are stopping children, and one of them tries to stop the current task, we should ignore it. We will be stopped later. + retry + end + + if stopped + raise Stop, "Stopping current task!" + end end def stop! diff --git a/test/async/task.rb b/test/async/task.rb index 730845f8..1f381903 100644 --- a/test/async/task.rb +++ b/test/async/task.rb @@ -439,6 +439,28 @@ def after end end + it "can stop the parent task which stops the stopping task" do + condition = Async::Notification.new + + reactor.run do |task| + task.async do + condition.wait + task.stop + end + + task.async do + sleep + end + + # NOTE: Hangs only if this second task is added + task.async do + sleep + end + + condition.signal + end + end + it "should not remove running task" do top_task = middle_task = bottom_task = nil