diff --git a/examples/workflows/wait_for_external_signal_workflow.rb b/examples/workflows/wait_for_external_signal_workflow.rb index 03986309..69bd8eea 100644 --- a/examples/workflows/wait_for_external_signal_workflow.rb +++ b/examples/workflows/wait_for_external_signal_workflow.rb @@ -12,7 +12,7 @@ def execute(expected_signal) signal_counts[signal] += 1 end - workflow.wait_for do + workflow.wait_until do workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}") signals_received.key?(expected_signal) end diff --git a/examples/workflows/wait_for_workflow.rb b/examples/workflows/wait_for_workflow.rb index 226ac9c4..6b26c28d 100644 --- a/examples/workflows/wait_for_workflow.rb +++ b/examples/workflows/wait_for_workflow.rb @@ -11,7 +11,7 @@ def execute(total_echos, max_echos_at_once, expected_signal) signals_received[signal] = input end - workflow.wait_for do + workflow.wait_until do workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}") signals_received.key?(expected_signal) end @@ -21,35 +21,21 @@ def execute(total_echos, max_echos_at_once, expected_signal) # workflow is completed. long_running_future = LongRunningActivity.execute(15, 0.1) timeout_timer = workflow.start_timer(1) - workflow.wait_for(timeout_timer, long_running_future) + workflow.wait_for_any(timeout_timer, long_running_future) timer_beat_activity = timeout_timer.finished? && !long_running_future.finished? # This should not wait further. The first future has already finished, and therefore # the second one should not be awaited upon. long_timeout_timer = workflow.start_timer(15) - workflow.wait_for(timeout_timer, long_timeout_timer) - raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished? - - block_called = false - workflow.wait_for(timeout_timer) do - # This should never be called because the timeout_timer future was already - # finished before the wait was even called. - block_called = true - end - raise 'Block should not have been called' if block_called - - workflow.wait_for(long_timeout_timer) do - # This condition will immediately be true and not result in any waiting or dispatching - true - end + workflow.wait_for_any(timeout_timer, long_timeout_timer) raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished? activity_futures = {} echos_completed = 0 total_echos.times do |i| - workflow.wait_for do + workflow.wait_until do workflow.logger.info("Activities in flight #{activity_futures.length}") # Pause workflow until the number of active activity futures is less than 2. This # will throttle new activities from being started, guaranteeing that only two of these @@ -66,7 +52,7 @@ def execute(total_echos, max_echos_at_once, expected_signal) end end - workflow.wait_for do + workflow.wait_until do workflow.logger.info("Waiting for queue to drain, size: #{activity_futures.length}") activity_futures.empty? end diff --git a/lib/temporal/testing/local_workflow_context.rb b/lib/temporal/testing/local_workflow_context.rb index a115eb26..95108fd8 100644 --- a/lib/temporal/testing/local_workflow_context.rb +++ b/lib/temporal/testing/local_workflow_context.rb @@ -170,12 +170,22 @@ def wait_for_all(*futures) return end - def wait_for(*futures, &unblock_condition) - if futures.empty? && unblock_condition.nil? - raise 'You must pass either a future or an unblock condition block to wait_for' + def wait_for_any(*futures) + return if futures.empty? + + while futures.empty? || futures.none?(&:finished?) + Fiber.yield + end + + return + end + + def wait_until(&unblock_condition) + if unblock_condition.nil? + raise 'You must pass either an unblock condition block to wait_for' end - while (futures.empty? || futures.none?(&:finished?)) && (!unblock_condition || !unblock_condition.call) + while !unblock_condition || !unblock_condition.call Fiber.yield end diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index ac4454ce..a8088692 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -136,7 +136,7 @@ def execute_workflow(workflow_class, *input, **args) dispatcher.register_handler(target, 'started') do child_workflow_started = true end - wait_for { child_workflow_started } + wait_until { child_workflow_started } future end @@ -228,60 +228,58 @@ def continue_as_new(*input, **args) completed! end + # Block workflow progress until all futures finish def wait_for_all(*futures) futures.each(&:wait) return end - # Block workflow progress until any future is finished or any unblock_condition - # block evaluates to true. - def wait_for(*futures, &unblock_condition) - if futures.empty? && unblock_condition.nil? - raise 'You must pass either a future or an unblock condition block to wait_for' - end + # Block workflow progress until one of the futures completes. Passing + # in an empty array will immediately unblock. + def wait_for_any(*futures) + return if futures.empty? || futures.any?(&:finished?) fiber = Fiber.current - should_yield = false blocked = true - if futures.any? - if futures.any?(&:finished?) - blocked = false - else - should_yield = true - futures.each do |future| - dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do - if blocked && future.finished? - # Because this block can run for any dispatch, ensure the fiber is only - # resumed one time by checking if it's already been unblocked. - blocked = false - fiber.resume - end - end + futures.each do |future| + id = dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do + # Because any of the futures can resume the fiber, ignore any callbacks + # from other futures after unblocking has occurred + if blocked + blocked = false + dispatcher.remove_handler(id, future.target) + fiber.resume end end end - if blocked && unblock_condition - if unblock_condition.call + Fiber.yield + + return + end + + # Block workflow progress until the specified block evaluates to true. + def wait_until(&unblock_condition) + raise 'You must pass a block to wait_until' if unblock_condition.nil? + + return if unblock_condition.call + + fiber = Fiber.current + blocked = true + + id = dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do + # Because this block can run for any dispatch, ensure the fiber is only + # resumed one time by checking if it's already been unblocked. + if blocked && unblock_condition.call blocked = false - should_yield = false - else - should_yield = true - - dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do - # Because this block can run for any dispatch, ensure the fiber is only - # resumed one time by checking if it's already been unblocked. - if blocked && unblock_condition.call - blocked = false - fiber.resume - end - end + dispatcher.remove_handler(Dispatcher::TARGET_WILDCARD, id) + fiber.resume end end - Fiber.yield if should_yield + Fiber.yield return end diff --git a/lib/temporal/workflow/dispatcher.rb b/lib/temporal/workflow/dispatcher.rb index 2a768e54..9ead7dcf 100644 --- a/lib/temporal/workflow/dispatcher.rb +++ b/lib/temporal/workflow/dispatcher.rb @@ -6,10 +6,17 @@ class Dispatcher def initialize @handlers = Hash.new { |hash, key| hash[key] = [] } + @next_id = 0 end def register_handler(target, event_name, &handler) - handlers[target] << [event_name, handler] + @next_id += 1 + handlers[target] << [@next_id, event_name, handler] + @next_id + end + + def remove_handler(target, id) + handlers[target] = handlers[target].reject { |(handler_id, _, _)| handler_id == id } end def dispatch(target, event_name, args = nil) @@ -25,7 +32,8 @@ def dispatch(target, event_name, args = nil) def handlers_for(target, event_name) handlers[target] .concat(handlers[TARGET_WILDCARD]) - .select { |(name, _)| name == event_name || name == WILDCARD } + .select { |(_, name, _)| name == event_name || name == WILDCARD } + .sort_by { |sequence, _, _| sequence } .map(&:last) end end diff --git a/lib/temporal/workflow/future.rb b/lib/temporal/workflow/future.rb index 550a038b..26929e86 100644 --- a/lib/temporal/workflow/future.rb +++ b/lib/temporal/workflow/future.rb @@ -31,7 +31,7 @@ def failed? def wait return if finished? - context.wait_for(self) + context.wait_for_any(self) end def get diff --git a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb index ae8eca8c..49614d35 100644 --- a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb +++ b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb @@ -146,7 +146,7 @@ def execute can_continue = false exited = false fiber = Fiber.new do - workflow_context.wait_for do + workflow_context.wait_until do can_continue end @@ -167,7 +167,7 @@ def execute future = workflow_context.execute_activity(TestAsyncActivity) fiber = Fiber.new do - workflow_context.wait_for(future) do + workflow_context.wait_for_any(future) do false end @@ -191,7 +191,7 @@ def execute future.wait fiber = Fiber.new do - workflow_context.wait_for(future, async_future) + workflow_context.wait_for_any(future, async_future) exited = true end diff --git a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb index 43ccc8fc..92819551 100644 --- a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb +++ b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb @@ -11,7 +11,7 @@ subject.register_handler(target, 'signaled', &block) - expect(subject.send(:handlers)).to include(target => [['signaled', block]]) + expect(subject.send(:handlers)).to include(target => [[1, 'signaled', block]]) end end @@ -70,10 +70,13 @@ context 'with TARGET_WILDCARD target handler' do let(:handler_6) { -> { 'sixth block' } } + let(:handler_7) { -> { 'seventh block' } } before do allow(handler_6).to receive(:call) + allow(handler_7).to receive(:call) subject.register_handler(described_class::TARGET_WILDCARD, described_class::WILDCARD, &handler_6) + subject.register_handler(target, 'completed', &handler_7) end it 'calls the handler' do @@ -83,6 +86,7 @@ expect(handler_1).to have_received(:call).ordered expect(handler_4).to have_received(:call).ordered expect(handler_6).to have_received(:call).ordered + expect(handler_7).to have_received(:call).ordered end it 'TARGET_WILDCARD can be compared to an EventTarget object' do diff --git a/spec/unit/lib/temporal/workflow/future_spec.rb b/spec/unit/lib/temporal/workflow/future_spec.rb index 4fbc5b37..293a7d84 100644 --- a/spec/unit/lib/temporal/workflow/future_spec.rb +++ b/spec/unit/lib/temporal/workflow/future_spec.rb @@ -46,8 +46,8 @@ expect(subject.get).to be exception end - it 'calls context.wait_for if not finished' do - allow(workflow_context).to receive(:wait_for).with(subject) + it 'calls context.wait_for_any if not finished' do + allow(workflow_context).to receive(:wait_for_any).with(subject) subject.get end end @@ -58,8 +58,8 @@ subject.wait end - it 'calls context.wait_for if not already done' do - allow(workflow_context).to receive(:wait_for).with(subject) + it 'calls context.wait_for_any if not already done' do + allow(workflow_context).to receive(:wait_for_any).with(subject) subject.wait end end