Skip to content

Commit 2f6c55d

Browse files
Remove finished dispatcher handlers, order dispatch handlers
1 parent dbf3273 commit 2f6c55d

File tree

3 files changed

+80
-30
lines changed

3 files changed

+80
-30
lines changed

lib/temporal/workflow/context.rb

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -252,20 +252,15 @@ def wait_for_any(*futures)
252252
return if futures.empty? || futures.any?(&:finished?)
253253

254254
fiber = Fiber.current
255-
blocked = true
256255

257-
futures.each do |future|
256+
handlers = futures.map do |future|
258257
dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
259-
# Because any of the futures can resume the fiber, ignore any callbacks
260-
# from other futures after unblocking has occurred
261-
if blocked && future.finished?
262-
blocked = false
263-
fiber.resume
264-
end
258+
fiber.resume if future.finished?
265259
end
266260
end
267261

268262
Fiber.yield
263+
handlers.each(&:unregister)
269264

270265
return
271266
end
@@ -277,18 +272,13 @@ def wait_until(&unblock_condition)
277272
return if unblock_condition.call
278273

279274
fiber = Fiber.current
280-
blocked = true
281-
282-
dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
283-
# Because this block can run for any dispatch, ensure the fiber is only
284-
# resumed one time by checking if it's already been unblocked.
285-
if blocked && unblock_condition.call
286-
blocked = false
287-
fiber.resume
288-
end
275+
276+
handler = dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
277+
fiber.resume if unblock_condition.call
289278
end
290279

291280
Fiber.yield
281+
handler.unregister
292282

293283
return
294284
end
@@ -316,6 +306,8 @@ def on_signal(signal_name = nil, &block)
316306
call_in_fiber(block, signal, input)
317307
end
318308
end
309+
310+
return
319311
end
320312

321313
def on_query(query, &block)

lib/temporal/workflow/dispatcher.rb

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require 'temporal/errors'
2+
13
module Temporal
24
class Workflow
35
# This provides a generic event dispatcher mechanism. There are two main entry
@@ -13,18 +15,42 @@ class Workflow
1315
# the event_name. The order of this dispatch is not guaranteed.
1416
#
1517
class Dispatcher
18+
# Raised if a duplicate ID is encountered during dispatch handling.
19+
# This likely indicates a bug in temporal-ruby or that unsupported multithreaded
20+
# workflow code is being used.
21+
class DuplicateIDError < InternalError; end
22+
23+
# Tracks a registered handle so that it can be unregistered later
24+
class RegistrationHandle
25+
def initialize(handlers_for_target, id)
26+
@handlers_for_target = handlers_for_target
27+
@id = id
28+
end
29+
30+
# Unregister the handler from the dispatcher
31+
def unregister
32+
handlers_for_target.delete(id)
33+
end
34+
35+
private
36+
37+
attr_reader :handlers_for_target, :id
38+
end
39+
1640
WILDCARD = '*'.freeze
1741
TARGET_WILDCARD = '*'.freeze
1842

1943
EventStruct = Struct.new(:event_name, :handler)
2044

2145
def initialize
22-
@handlers = Hash.new { |hash, key| hash[key] = [] }
46+
@handlers = Hash.new { |hash, key| hash[key] = {} }
47+
@next_id = 0
2348
end
2449

2550
def register_handler(target, event_name, &handler)
26-
handlers[target] << EventStruct.new(event_name, handler)
27-
self
51+
@next_id += 1
52+
handlers[target][@next_id] = EventStruct.new(event_name, handler)
53+
RegistrationHandle.new(handlers[target], @next_id)
2854
end
2955

3056
def dispatch(target, event_name, args = nil)
@@ -39,9 +65,10 @@ def dispatch(target, event_name, args = nil)
3965

4066
def handlers_for(target, event_name)
4167
handlers[target]
42-
.concat(handlers[TARGET_WILDCARD])
43-
.select { |event_struct| match?(event_struct, event_name) }
44-
.map(&:handler)
68+
.merge(handlers[TARGET_WILDCARD]) { raise DuplicateIDError.new('Cannot resolve duplicate dispatcher handler IDs') }
69+
.select { |_, event_struct| match?(event_struct, event_name) }
70+
.sort
71+
.map { |_, event_struct| event_struct.handler }
4572
end
4673

4774
def match?(event_struct, event_name)

spec/unit/lib/temporal/workflow/dispatcher_spec.rb

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
describe '#register_handler' do
99
let(:block) { -> { 'handler body' } }
1010
let(:event_name) { 'signaled' }
11-
let(:dispatcher) { subject.register_handler(target, event_name, &block) }
11+
let(:dispatcher) do
12+
subject.register_handler(target, event_name, &block)
13+
subject
14+
end
1215
let(:handlers) { dispatcher.send(:handlers) }
1316

1417
context 'with default handler_name' do
@@ -19,17 +22,17 @@
1922
end
2023

2124
it 'stores the target and handler once' do
22-
expect(handlers[target]).to be_kind_of(Array)
25+
expect(handlers[target]).to be_kind_of(Hash)
2326
expect(handlers[target].count).to eq 1
2427
end
2528

2629
it 'associates the event name with the target' do
27-
event = handlers[target].first
30+
event = handlers[target][1]
2831
expect(event.event_name).to eq(event_name)
2932
end
3033

3134
it 'associates the handler with the target' do
32-
event = handlers[target].first
35+
event = handlers[target][1]
3336
expect(event.handler).to eq(block)
3437
end
3538
end
@@ -43,20 +46,44 @@
4346
end
4447

4548
it 'stores the target and handler once' do
46-
expect(handlers[target]).to be_kind_of(Array)
49+
expect(handlers[target]).to be_kind_of(Hash)
4750
expect(handlers[target].count).to eq 1
4851
end
4952

5053
it 'associates the event name and handler name with the target' do
51-
event = handlers[target].first
54+
event = handlers[target][1]
5255
expect(event.event_name).to eq(event_name)
5356
end
5457

5558
it 'associates the handler with the target' do
56-
event = handlers[target].first
59+
event = handlers[target][1]
5760
expect(event.handler).to eq(block)
5861
end
5962
end
63+
64+
it 'removes a given handler against the target' do
65+
block1 = -> { 'handler body' }
66+
block2 = -> { 'other handler body' }
67+
block3 = -> { 'yet another handler body' }
68+
69+
handle1 = subject.register_handler(target, 'signaled', &block1)
70+
subject.register_handler(target, 'signaled', &block2)
71+
subject.register_handler(other_target, 'signaled', &block3)
72+
73+
expect(subject.send(:handlers)[target][1].event_name).to eq('signaled')
74+
expect(subject.send(:handlers)[target][1].handler).to be(block1)
75+
76+
expect(subject.send(:handlers)[target][2].event_name).to eq('signaled')
77+
expect(subject.send(:handlers)[target][2].handler).to be(block2)
78+
79+
expect(subject.send(:handlers)[other_target][3].event_name).to eq('signaled')
80+
expect(subject.send(:handlers)[other_target][3].handler).to be(block3)
81+
82+
handle1.unregister
83+
expect(subject.send(:handlers)[target][1]).to be(nil)
84+
expect(subject.send(:handlers)[target][2]).to_not be(nil)
85+
expect(subject.send(:handlers)[other_target][3]).to_not be(nil)
86+
end
6087
end
6188

6289
describe '#dispatch' do
@@ -114,10 +141,13 @@
114141

115142
context 'with TARGET_WILDCARD target handler' do
116143
let(:handler_6) { -> { 'sixth block' } }
144+
let(:handler_7) { -> { 'seventh block' } }
117145
before do
118146
allow(handler_6).to receive(:call)
147+
allow(handler_7).to receive(:call)
119148

120149
subject.register_handler(described_class::TARGET_WILDCARD, described_class::WILDCARD, &handler_6)
150+
subject.register_handler(target, 'completed', &handler_7)
121151
end
122152

123153
it 'calls the handler' do
@@ -127,6 +157,7 @@
127157
expect(handler_1).to have_received(:call).ordered
128158
expect(handler_4).to have_received(:call).ordered
129159
expect(handler_6).to have_received(:call).ordered
160+
expect(handler_7).to have_received(:call).ordered
130161
end
131162

132163
it 'TARGET_WILDCARD can be compared to an EventTarget object' do

0 commit comments

Comments
 (0)