Skip to content

Commit 14229a6

Browse files
committed
Use linked list for barrier implementation.
1 parent 127f8d0 commit 14229a6

File tree

9 files changed

+128
-106
lines changed

9 files changed

+128
-106
lines changed

lib/async/barrier.rb

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# Released under the MIT License.
44
# Copyright, 2019-2022, by Samuel Williams.
55

6+
require_relative 'list'
67
require_relative 'task'
78

89
module Async
@@ -13,25 +14,33 @@ class Barrier
1314
# @parameter parent [Task | Semaphore | Nil] The parent for holding any children tasks.
1415
# @public Since `stable-v1`.
1516
def initialize(parent: nil)
16-
@tasks = []
17+
@tasks = List.new
1718

1819
@parent = parent
1920
end
2021

21-
# All tasks which have been invoked into the barrier.
22-
attr :tasks
22+
class Waiting < List::Node
23+
def initialize(task)
24+
@task = task
25+
end
26+
27+
attr :task
28+
end
2329

24-
# The number of tasks currently held by the barrier.
30+
# Number of tasks being held by the barrier.
2531
def size
2632
@tasks.size
2733
end
2834

35+
# All tasks which have been invoked into the barrier.
36+
attr :tasks
37+
2938
# Execute a child task and add it to the barrier.
3039
# @asynchronous Executes the given block concurrently.
3140
def async(*arguments, parent: (@parent or Task.current), **options, &block)
3241
task = parent.async(*arguments, **options, &block)
3342

34-
@tasks << task
43+
@tasks.append(Waiting.new(task))
3544

3645
return task
3746
end
@@ -42,31 +51,22 @@ def empty?
4251
@tasks.empty?
4352
end
4453

45-
# Wait for all tasks.
54+
# Wait for all tasks to complete. You will still want to wait for individual tasks to complete if you want to handle errors.
4655
# @asynchronous Will wait for tasks to finish executing.
4756
def wait
48-
# TODO: This would be better with linked list.
49-
while @tasks.any?
50-
task = @tasks.first
51-
52-
begin
53-
task.wait
54-
ensure
55-
# We don't know for sure that the exception was due to the task completion.
56-
unless task.running?
57-
# Remove the task from the waiting list if it's finished:
58-
@tasks.shift if @tasks.first == task
59-
end
60-
end
57+
while waiting = @tasks.first
58+
task = waiting.task
59+
task.join
60+
@tasks.remove?(waiting)
6161
end
6262
end
6363

6464
# Stop all tasks held by the barrier.
6565
# @asynchronous May wait for tasks to finish executing.
6666
def stop
67-
# We have to be careful to avoid enumerating tasks while adding/removing to it:
68-
tasks = @tasks.dup
69-
tasks.each(&:stop)
67+
@tasks.each do |waiting|
68+
waiting.task.stop
69+
end
7070
end
7171
end
7272
end

lib/async/condition.rb

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,9 @@ def alive?
3434
# Queue up the current fiber and wait on yielding the task.
3535
# @returns [Object]
3636
def wait
37-
waiter = Waiter.new(Fiber.current)
38-
@waiting.append(waiter)
39-
40-
Fiber.scheduler.transfer
41-
ensure
42-
waiter.delete!
37+
@waiting.stack(Waiter.new(Fiber.current)) do
38+
Fiber.scheduler.transfer
39+
end
4340
end
4441

4542
# Is any fiber waiting on this notification?

lib/async/list.rb

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class List
88
def initialize
99
@head = self
1010
@tail = self
11+
@size = 0
1112
end
1213

1314
# @private
@@ -16,6 +17,13 @@ def initialize
1617
# @private
1718
attr_accessor :tail
1819

20+
attr :size
21+
22+
def added(node)
23+
@size += 1
24+
return node
25+
end
26+
1927
# Append a node to the end of the list.
2028
def append(node)
2129
if node.head
@@ -27,7 +35,7 @@ def append(node)
2735
node.head = @head
2836
@head = node
2937

30-
return node
38+
return added(node)
3139
end
3240

3341
def prepend(node)
@@ -40,24 +48,48 @@ def prepend(node)
4048
node.tail = @tail
4149
@tail = node
4250

51+
return added(node)
52+
end
53+
54+
# Add the node, yield, and the remove the node.
55+
def stack(node, &block)
56+
append(node)
57+
yield
58+
ensure
59+
remove!(node)
60+
end
61+
62+
def removed(node)
63+
@size -= 1
4364
return node
4465
end
4566

46-
def delete(node)
47-
# One downside of this interface is we don't actually check if the node is part of the list defined by `self`. This means that there is a potential for a node to be deleted from a different list using this method, which in can throw off book-keeping when lists track size, etc.
48-
67+
# Remove the node if it is in the list.
68+
def remove?(node)
69+
if node.head
70+
remove!(node)
71+
end
72+
end
73+
74+
# Remove the node.
75+
def remove(node)
76+
# One downside of this interface is we don't actually check if the node is part of the list defined by `self`. This means that there is a potential for a node to be removed from a different list using this method, which in can throw off book-keeping when lists track size, etc.
4977
unless node.head
5078
raise ArgumentError, "Node is not in a list!"
5179
end
5280

81+
remove!(node)
82+
end
83+
84+
private def remove!(node)
5385
node.head.tail = node.tail
5486
node.tail.head = node.head
5587

56-
# This marks the node as being deleted, and causes delete to fail if called a 2nd time.
88+
# This marks the node as being removed, and causes remove to fail if called a 2nd time.
5789
node.head = nil
5890
# node.tail = nil
5991

60-
return node
92+
return removed(node)
6193
end
6294

6395
def empty?
@@ -92,28 +124,20 @@ def include?(needle)
92124
end
93125

94126
def first
95-
@tail
127+
unless @tail.equal?(self)
128+
@tail
129+
end
96130
end
97131

98132
def last
99-
@head
133+
unless @head.equal?(self)
134+
@head
135+
end
100136
end
101137
end
102138

103139
class List::Node
104140
attr_accessor :head
105141
attr_accessor :tail
106-
107-
# Delete the node from the list.
108-
def delete!
109-
@head.tail = @tail
110-
@tail.head = @head
111-
@head = nil
112-
113-
# See above deletion implementation for more details:
114-
# @tail = nil
115-
116-
return self
117-
end
118142
end
119143
end

lib/async/node.rb

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,28 +12,14 @@ module Async
1212
class Children < List
1313
def initialize
1414
super
15-
16-
@size = 0
1715
@transient_count = 0
1816
end
1917

20-
attr :size
21-
2218
# Does this node have (direct) transient children?
2319
def transients?
2420
@transient_count > 0
2521
end
2622

27-
def append(node)
28-
added(super)
29-
end
30-
31-
undef prepend
32-
33-
def delete(node)
34-
removed(super)
35-
end
36-
3723
def finished?
3824
@size == @transient_count
3925
end
@@ -49,15 +35,15 @@ def added(node)
4935
@transient_count += 1
5036
end
5137

52-
@size += 1
38+
return super
5339
end
5440

5541
def removed(node)
5642
if node.transient?
5743
@transient_count -= 1
5844
end
5945

60-
@size -= 1
46+
return super
6147
end
6248
end
6349

@@ -152,7 +138,7 @@ def parent=(parent)
152138
return if @parent.equal?(parent)
153139

154140
if @parent
155-
@parent.delete_child(self)
141+
@parent.remove_child(self)
156142
@parent = nil
157143
end
158144

@@ -173,8 +159,8 @@ def parent=(parent)
173159
child.set_parent(self)
174160
end
175161

176-
protected def delete_child(child)
177-
@children.delete(child)
162+
protected def remove_child(child)
163+
@children.remove(child)
178164
child.set_parent(nil)
179165
end
180166

@@ -189,15 +175,15 @@ def finished?
189175
# the parent.
190176
def consume
191177
if parent = @parent and finished?
192-
parent.delete_child(self)
178+
parent.remove_child(self)
193179

194180
if @children
195181
@children.each do |child|
196182
if child.finished?
197-
delete_child(child)
183+
remove_child(child)
198184
else
199185
# In theory we don't need to do this... because we are throwing away the list. However, if you don't correctly update the list when moving the child to the parent, it foobars the enumeration, and subsequent nodes will be skipped, or in the worst case you might start enumerating the parents nodes.
200-
delete_child(child)
186+
remove_child(child)
201187
parent.add_child(child)
202188
end
203189
end

lib/async/task.rb

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,22 +119,26 @@ def async(*arguments, **options, &block)
119119
return task
120120
end
121121

122-
# Retrieve the current result of the task. Will cause the caller to wait until result is available. If the result was an exception, raise that exception.
123-
# @raises [RuntimeError] If the task's fiber is the current fiber.
124-
# @returns [Object] The final expression/result of the task's block.
125-
def wait
122+
def join
126123
raise "Cannot wait on own fiber" if Fiber.current.equal?(@fiber)
127124

128125
if running?
129126
@finished ||= Condition.new
130127
@finished.wait
131128
end
132129

133-
case @result
130+
return @result
131+
end
132+
133+
# Retrieve the current result of the task. Will cause the caller to wait until result is available. If the result was an exception, raise that exception.
134+
# @raises [RuntimeError] If the task's fiber is the current fiber.
135+
# @returns [Object] The final expression/result of the task's block.
136+
def wait
137+
case result = self.join
134138
when Exception
135-
raise @result
139+
raise result
136140
else
137-
return @result
141+
return result
138142
end
139143
end
140144

test/async/barrier.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@
5858
expect(task1).to be(:failed?)
5959
expect(task2).to be(:finished?)
6060

61-
expect{barrier.wait}.to raise_exception(RuntimeError, message: be =~ /Boom/)
61+
barrier.wait
6262

63-
barrier.wait until barrier.empty?
63+
expect{task1.wait}.to raise_exception(RuntimeError, message: be =~ /Boom/)
6464

6565
expect(barrier).to be(:empty?)
6666
end

test/async/children.rb

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,19 @@
2525
expect(children).not.to be(:empty?)
2626
end
2727

28-
it "can't delete a child that hasn't been inserted" do
28+
it "can't remove a child that hasn't been inserted" do
2929
child = Async::Node.new
3030

31-
expect{children.delete(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/)
31+
expect{children.remove(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/)
3232
end
3333

34-
it "can't delete the child twice" do
34+
it "can't remove the child twice" do
3535
child = Async::Node.new
3636
children.append(child)
3737

38-
children.delete(child)
38+
children.remove(child)
3939

40-
expect{children.delete(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/)
40+
expect{children.remove(child)}.to raise_exception(ArgumentError, message: be =~ /not in a list/)
4141
end
4242
end
4343
end

0 commit comments

Comments
 (0)