diff --git a/lib/async/barrier.rb b/lib/async/barrier.rb index 3aa3dc5d..fb3a4c0a 100644 --- a/lib/async/barrier.rb +++ b/lib/async/barrier.rb @@ -3,6 +3,7 @@ # Released under the MIT License. # Copyright, 2019-2022, by Samuel Williams. +require_relative 'list' require_relative 'task' module Async @@ -13,25 +14,33 @@ class Barrier # @parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks. # @public Since `stable-v1`. def initialize(parent: nil) - @tasks = [] + @tasks = List.new @parent = parent end - # All tasks which have been invoked into the barrier. - attr :tasks + class Waiting < List::Node + def initialize(task) + @task = task + end + + attr :task + end - # The number of tasks currently held by the barrier. + # Number of tasks being held by the barrier. def size @tasks.size end + # All tasks which have been invoked into the barrier. + attr :tasks + # Execute a child task and add it to the barrier. # @asynchronous Executes the given block concurrently. def async(*arguments, parent: (@parent or Task.current), **options, &block) task = parent.async(*arguments, **options, &block) - @tasks << task + @tasks.append(Waiting.new(task)) return task end @@ -42,31 +51,22 @@ def empty? @tasks.empty? end - # Wait for all tasks. + # Wait for all tasks to complete. You will still want to wait for individual tasks to complete if you want to handle errors. # @asynchronous Will wait for tasks to finish executing. def wait - # TODO: This would be better with linked list. - while @tasks.any? - task = @tasks.first - - begin - task.wait - ensure - # We don't know for sure that the exception was due to the task completion. - unless task.running? - # Remove the task from the waiting list if it's finished: - @tasks.shift if @tasks.first == task - end - end + while waiting = @tasks.first + task = waiting.task + task.join + @tasks.remove?(waiting) end end # Stop all tasks held by the barrier. # @asynchronous May wait for tasks to finish executing. def stop - # We have to be careful to avoid enumerating tasks while adding/removing to it: - tasks = @tasks.dup - tasks.each(&:stop) + @tasks.each do |waiting| + waiting.task.stop + end end end end diff --git a/lib/async/condition.rb b/lib/async/condition.rb index 0e915c87..7bb32b23 100644 --- a/lib/async/condition.rb +++ b/lib/async/condition.rb @@ -34,12 +34,9 @@ def alive? # Queue up the current fiber and wait on yielding the task. # @returns [Object] def wait - waiter = Waiter.new(Fiber.current) - @waiting.append(waiter) - - Fiber.scheduler.transfer - ensure - waiter.delete! + @waiting.stack(Waiter.new(Fiber.current)) do + Fiber.scheduler.transfer + end end # Is any fiber waiting on this notification? diff --git a/lib/async/list.rb b/lib/async/list.rb index 3bf647c7..a950b6ea 100644 --- a/lib/async/list.rb +++ b/lib/async/list.rb @@ -8,6 +8,7 @@ class List def initialize @head = self @tail = self + @size = 0 end # @private @@ -16,6 +17,13 @@ def initialize # @private attr_accessor :tail + attr :size + + def added(node) + @size += 1 + return node + end + # Append a node to the end of the list. def append(node) if node.head @@ -27,7 +35,7 @@ def append(node) node.head = @head @head = node - return node + return added(node) end def prepend(node) @@ -40,24 +48,48 @@ def prepend(node) node.tail = @tail @tail = node + return added(node) + end + + # Add the node, yield, and the remove the node. + def stack(node, &block) + append(node) + yield + ensure + remove!(node) + end + + def removed(node) + @size -= 1 return node end - def delete(node) - # One downside of this interface is we don't actually check if the node is part of the list defined by `self`. This means that there is a potential for a node to be deleted from a different list using this method, which in can throw off book-keeping when lists track size, etc. - + # Remove the node if it is in the list. + def remove?(node) + if node.head + remove!(node) + end + end + + # Remove the node. + def remove(node) + # One downside of this interface is we don't actually check if the node is part of the list defined by `self`. This means that there is a potential for a node to be removed from a different list using this method, which in can throw off book-keeping when lists track size, etc. unless node.head raise ArgumentError, "Node is not in a list!" end + remove!(node) + end + + private def remove!(node) node.head.tail = node.tail node.tail.head = node.head - # This marks the node as being deleted, and causes delete to fail if called a 2nd time. + # This marks the node as being removed, and causes remove to fail if called a 2nd time. node.head = nil # node.tail = nil - return node + return removed(node) end def empty? @@ -92,28 +124,20 @@ def include?(needle) end def first - @tail + unless @tail.equal?(self) + @tail + end end def last - @head + unless @head.equal?(self) + @head + end end end class List::Node attr_accessor :head attr_accessor :tail - - # Delete the node from the list. - def delete! - @head.tail = @tail - @tail.head = @head - @head = nil - - # See above deletion implementation for more details: - # @tail = nil - - return self - end end end diff --git a/lib/async/node.rb b/lib/async/node.rb index cfbe6930..a11182e1 100644 --- a/lib/async/node.rb +++ b/lib/async/node.rb @@ -12,28 +12,14 @@ module Async class Children < List def initialize super - - @size = 0 @transient_count = 0 end - attr :size - # Does this node have (direct) transient children? def transients? @transient_count > 0 end - def append(node) - added(super) - end - - undef prepend - - def delete(node) - removed(super) - end - def finished? @size == @transient_count end @@ -49,7 +35,7 @@ def added(node) @transient_count += 1 end - @size += 1 + return super end def removed(node) @@ -57,7 +43,7 @@ def removed(node) @transient_count -= 1 end - @size -= 1 + return super end end @@ -152,7 +138,7 @@ def parent=(parent) return if @parent.equal?(parent) if @parent - @parent.delete_child(self) + @parent.remove_child(self) @parent = nil end @@ -173,8 +159,8 @@ def parent=(parent) child.set_parent(self) end - protected def delete_child(child) - @children.delete(child) + protected def remove_child(child) + @children.remove(child) child.set_parent(nil) end @@ -189,15 +175,15 @@ def finished? # the parent. def consume if parent = @parent and finished? - parent.delete_child(self) + parent.remove_child(self) if @children @children.each do |child| if child.finished? - delete_child(child) + remove_child(child) else # In theory we don't need to do this... because we are throwing away the list. However, if you don't correctly update the list when moving the child to the parent, it foobars the enumeration, and subsequent nodes will be skipped, or in the worst case you might start enumerating the parents nodes. - delete_child(child) + remove_child(child) parent.add_child(child) end end @@ -209,14 +195,14 @@ def consume end end + # Traverse the task tree. + # @yields {|node, level| ...} The node and the level relative to the given root. def traverse(&block) return enum_for(:traverse) unless block_given? self.traverse_recurse(&block) end - # Traverse the tree. - # @yields {|node, level| ...} The node and the level relative to the given root. protected def traverse_recurse(level = 0, &block) yield self, level diff --git a/lib/async/task.rb b/lib/async/task.rb index 118d972a..1b75fcba 100644 --- a/lib/async/task.rb +++ b/lib/async/task.rb @@ -119,10 +119,7 @@ def async(*arguments, **options, &block) return task end - # Retrieve the current result of the task. Will cause the caller to wait until result is available. If the result was an exception, raise that exception. - # @raises [RuntimeError] If the task's fiber is the current fiber. - # @returns [Object] The final expression/result of the task's block. - def wait + def join raise "Cannot wait on own fiber" if Fiber.current.equal?(@fiber) if running? @@ -130,11 +127,18 @@ def wait @finished.wait end - case @result + return @result + end + + # Retrieve the current result of the task. Will cause the caller to wait until result is available. If the result was an exception, raise that exception. + # @raises [RuntimeError] If the task's fiber is the current fiber. + # @returns [Object] The final expression/result of the task's block. + def wait + case result = self.join when Exception - raise @result + raise result else - return @result + return result end end diff --git a/test/async/barrier.rb b/test/async/barrier.rb index 2e88d04f..992c8b32 100644 --- a/test/async/barrier.rb +++ b/test/async/barrier.rb @@ -58,9 +58,9 @@ expect(task1).to be(:failed?) expect(task2).to be(:finished?) - expect{barrier.wait}.to raise_exception(RuntimeError, message: be =~ /Boom/) + barrier.wait - barrier.wait until barrier.empty? + expect{task1.wait}.to raise_exception(RuntimeError, message: be =~ /Boom/) expect(barrier).to be(:empty?) end diff --git a/test/async/children.rb b/test/async/children.rb index 0b3907ab..2e1186c8 100644 --- a/test/async/children.rb +++ b/test/async/children.rb @@ -25,19 +25,19 @@ expect(children).not.to be(:empty?) end - it "can't delete a child that hasn't been inserted" do + it "can't remove a child that hasn't been inserted" do child = Async::Node.new - expect{children.delete(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/) + expect{children.remove(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/) end - it "can't delete the child twice" do + it "can't remove the child twice" do child = Async::Node.new children.append(child) - children.delete(child) + children.remove(child) - expect{children.delete(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/) + expect{children.remove(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/) end end end diff --git a/test/async/list.rb b/test/async/list.rb index 28a2b97b..801e8ca5 100644 --- a/test/async/list.rb +++ b/test/async/list.rb @@ -56,12 +56,12 @@ def initialize(value) end end - with '#delete' do - it "can delete items" do + with '#remove' do + it "can remove items" do item = Item.new(1) list.append(item) - list.delete(item) + list.remove(item) expect(list.each.map(&:value)).to be(:empty?) end @@ -70,39 +70,26 @@ def initialize(value) item = Item.new(1) list.append(item) - list.delete(item) + list.remove(item) expect do - list.delete(item) + list.remove(item) end.to raise_exception(ArgumentError, message: be =~ /not in a list/) end - it "can delete item from the middle" do + it "can remove an item from the middle" do item = Item.new(1) list.append(Item.new(2)) list.append(item) list.append(Item.new(3)) - list.delete(item) + list.remove(item) expect(list.each.map(&:value)).to be == [2, 3] end end - with 'Node#delete!' do - it "can't remove an item twice" do - item = Item.new(1) - - list.append(item) - item.delete! - - expect do - item.delete! - end.to raise_exception(NoMethodError) - end - end - with '#each' do it "can iterate over nodes while deleting them" do nodes = [Item.new(1), Item.new(2), Item.new(3)] @@ -119,7 +106,7 @@ def initialize(value) # This tests that enumeration is tolerant of deletion: if index == 1 # When we are indexing child 1, it means the current node is child 0 - deleting it shouldn't break enumeration: - list.delete(nodes.first) + list.remove(nodes.first) end index += 1 @@ -128,4 +115,28 @@ def initialize(value) expect(enumerated).to be == nodes end end + + with '#first' do + it "can return the first item" do + item = Item.new(1) + + list.append(item) + list.append(Item.new(2)) + list.append(Item.new(3)) + + expect(list.first).to be == item + end + end + + with '#last' do + it "can return the last item" do + item = Item.new(1) + + list.append(Item.new(2)) + list.append(Item.new(3)) + list.append(item) + + expect(list.last).to be == item + end + end end diff --git a/test/async/node.rb b/test/async/node.rb index 3b06edae..b0008908 100644 --- a/test/async/node.rb +++ b/test/async/node.rb @@ -162,7 +162,7 @@ expect(node.children.each.to_a).to be == bottom end - it "deletes children that are also finished" do + it "removes children that are also finished" do middle = Async::Node.new(node) bottom = Async::Node.new(middle)