diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index a5d50e89..23e86ad8 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -252,20 +252,15 @@ def wait_for_any(*futures) return if futures.empty? || futures.any?(&:finished?) fiber = Fiber.current - blocked = true - futures.each do |future| + handlers = futures.map do |future| dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do - # Because any of the futures can resume the fiber, ignore any callbacks - # from other futures after unblocking has occurred - if blocked && future.finished? - blocked = false - fiber.resume - end + fiber.resume if future.finished? end end Fiber.yield + handlers.each(&:unregister) return end @@ -277,18 +272,13 @@ def wait_until(&unblock_condition) return if unblock_condition.call fiber = Fiber.current - blocked = true - - dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do - # Because this block can run for any dispatch, ensure the fiber is only - # resumed one time by checking if it's already been unblocked. - if blocked && unblock_condition.call - blocked = false - fiber.resume - end + + handler = dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do + fiber.resume if unblock_condition.call end Fiber.yield + handler.unregister return end @@ -316,6 +306,8 @@ def on_signal(signal_name = nil, &block) call_in_fiber(block, signal, input) end end + + return end def on_query(query, &block) diff --git a/lib/temporal/workflow/dispatcher.rb b/lib/temporal/workflow/dispatcher.rb index 58b32f46..d327cbe5 100644 --- a/lib/temporal/workflow/dispatcher.rb +++ b/lib/temporal/workflow/dispatcher.rb @@ -1,3 +1,5 @@ +require 'temporal/errors' + module Temporal class Workflow # This provides a generic event dispatcher mechanism. There are two main entry @@ -13,18 +15,44 @@ class Workflow # the event_name. The order of this dispatch is not guaranteed. # class Dispatcher + # Raised if a duplicate ID is encountered during dispatch handling. + # This likely indicates a bug in temporal-ruby or that unsupported multithreaded + # workflow code is being used. + class DuplicateIDError < InternalError; end + + # Tracks a registered handle so that it can be unregistered later + # The handlers are passed by reference here to be mutated (removed) by the + # unregister call below. + class RegistrationHandle + def initialize(handlers_for_target, id) + @handlers_for_target = handlers_for_target + @id = id + end + + # Unregister the handler from the dispatcher + def unregister + handlers_for_target.delete(id) + end + + private + + attr_reader :handlers_for_target, :id + end + WILDCARD = '*'.freeze TARGET_WILDCARD = '*'.freeze EventStruct = Struct.new(:event_name, :handler) def initialize - @handlers = Hash.new { |hash, key| hash[key] = [] } + @handlers = Hash.new { |hash, key| hash[key] = {} } + @next_id = 0 end def register_handler(target, event_name, &handler) - handlers[target] << EventStruct.new(event_name, handler) - self + @next_id += 1 + handlers[target][@next_id] = EventStruct.new(event_name, handler) + RegistrationHandle.new(handlers[target], @next_id) end def dispatch(target, event_name, args = nil) @@ -39,9 +67,10 @@ def dispatch(target, event_name, args = nil) def handlers_for(target, event_name) handlers[target] - .concat(handlers[TARGET_WILDCARD]) - .select { |event_struct| match?(event_struct, event_name) } - .map(&:handler) + .merge(handlers[TARGET_WILDCARD]) { raise DuplicateIDError.new('Cannot resolve duplicate dispatcher handler IDs') } + .select { |_, event_struct| match?(event_struct, event_name) } + .sort + .map { |_, event_struct| event_struct.handler } end def match?(event_struct, event_name) diff --git a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb index 6e1ae8d6..18aa1a9e 100644 --- a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb +++ b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb @@ -8,7 +8,10 @@ describe '#register_handler' do let(:block) { -> { 'handler body' } } let(:event_name) { 'signaled' } - let(:dispatcher) { subject.register_handler(target, event_name, &block) } + let(:dispatcher) do + subject.register_handler(target, event_name, &block) + subject + end let(:handlers) { dispatcher.send(:handlers) } context 'with default handler_name' do @@ -19,17 +22,17 @@ end it 'stores the target and handler once' do - expect(handlers[target]).to be_kind_of(Array) + expect(handlers[target]).to be_kind_of(Hash) expect(handlers[target].count).to eq 1 end it 'associates the event name with the target' do - event = handlers[target].first + event = handlers[target][1] expect(event.event_name).to eq(event_name) end it 'associates the handler with the target' do - event = handlers[target].first + event = handlers[target][1] expect(event.handler).to eq(block) end end @@ -43,20 +46,44 @@ end it 'stores the target and handler once' do - expect(handlers[target]).to be_kind_of(Array) + expect(handlers[target]).to be_kind_of(Hash) expect(handlers[target].count).to eq 1 end it 'associates the event name and handler name with the target' do - event = handlers[target].first + event = handlers[target][1] expect(event.event_name).to eq(event_name) end it 'associates the handler with the target' do - event = handlers[target].first + event = handlers[target][1] expect(event.handler).to eq(block) end end + + it 'removes a given handler against the target' do + block1 = -> { 'handler body' } + block2 = -> { 'other handler body' } + block3 = -> { 'yet another handler body' } + + handle1 = subject.register_handler(target, 'signaled', &block1) + subject.register_handler(target, 'signaled', &block2) + subject.register_handler(other_target, 'signaled', &block3) + + expect(subject.send(:handlers)[target][1].event_name).to eq('signaled') + expect(subject.send(:handlers)[target][1].handler).to be(block1) + + expect(subject.send(:handlers)[target][2].event_name).to eq('signaled') + expect(subject.send(:handlers)[target][2].handler).to be(block2) + + expect(subject.send(:handlers)[other_target][3].event_name).to eq('signaled') + expect(subject.send(:handlers)[other_target][3].handler).to be(block3) + + handle1.unregister + expect(subject.send(:handlers)[target][1]).to be(nil) + expect(subject.send(:handlers)[target][2]).to_not be(nil) + expect(subject.send(:handlers)[other_target][3]).to_not be(nil) + end end describe '#dispatch' do @@ -114,10 +141,13 @@ context 'with TARGET_WILDCARD target handler' do let(:handler_6) { -> { 'sixth block' } } + let(:handler_7) { -> { 'seventh block' } } before do allow(handler_6).to receive(:call) + allow(handler_7).to receive(:call) subject.register_handler(described_class::TARGET_WILDCARD, described_class::WILDCARD, &handler_6) + subject.register_handler(target, 'completed', &handler_7) end it 'calls the handler' do @@ -127,6 +157,7 @@ expect(handler_1).to have_received(:call).ordered expect(handler_4).to have_received(:call).ordered expect(handler_6).to have_received(:call).ordered + expect(handler_7).to have_received(:call).ordered end it 'TARGET_WILDCARD can be compared to an EventTarget object' do