diff --git a/examples/workflows/query_workflow.rb b/examples/workflows/query_workflow.rb index 47650ca4..4ecc0f9f 100644 --- a/examples/workflows/query_workflow.rb +++ b/examples/workflows/query_workflow.rb @@ -14,7 +14,7 @@ def execute @last_signal_received = signal end - workflow.wait_for { last_signal_received == "finish" } + workflow.wait_until { last_signal_received == "finish" } @state = "finished" { diff --git a/examples/workflows/wait_for_external_signal_workflow.rb b/examples/workflows/wait_for_external_signal_workflow.rb index 03986309..69bd8eea 100644 --- a/examples/workflows/wait_for_external_signal_workflow.rb +++ b/examples/workflows/wait_for_external_signal_workflow.rb @@ -12,7 +12,7 @@ def execute(expected_signal) signal_counts[signal] += 1 end - workflow.wait_for do + workflow.wait_until do workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}") signals_received.key?(expected_signal) end diff --git a/examples/workflows/wait_for_named_signal_workflow.rb b/examples/workflows/wait_for_named_signal_workflow.rb index 9f715a2a..96f96ece 100644 --- a/examples/workflows/wait_for_named_signal_workflow.rb +++ b/examples/workflows/wait_for_named_signal_workflow.rb @@ -20,7 +20,7 @@ def execute(expected_signal) end timeout_timer = workflow.start_timer(1) - workflow.wait_for(timeout_timer) + workflow.wait_for_any(timeout_timer) { received: signals_received, counts: signal_counts } end diff --git a/examples/workflows/wait_for_workflow.rb b/examples/workflows/wait_for_workflow.rb index 226ac9c4..6b26c28d 100644 --- a/examples/workflows/wait_for_workflow.rb +++ b/examples/workflows/wait_for_workflow.rb @@ -11,7 +11,7 @@ def execute(total_echos, max_echos_at_once, expected_signal) signals_received[signal] = input end - workflow.wait_for do + workflow.wait_until do workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}") signals_received.key?(expected_signal) end @@ -21,35 +21,21 @@ def execute(total_echos, max_echos_at_once, expected_signal) # workflow is completed. long_running_future = LongRunningActivity.execute(15, 0.1) timeout_timer = workflow.start_timer(1) - workflow.wait_for(timeout_timer, long_running_future) + workflow.wait_for_any(timeout_timer, long_running_future) timer_beat_activity = timeout_timer.finished? && !long_running_future.finished? # This should not wait further. The first future has already finished, and therefore # the second one should not be awaited upon. long_timeout_timer = workflow.start_timer(15) - workflow.wait_for(timeout_timer, long_timeout_timer) - raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished? - - block_called = false - workflow.wait_for(timeout_timer) do - # This should never be called because the timeout_timer future was already - # finished before the wait was even called. - block_called = true - end - raise 'Block should not have been called' if block_called - - workflow.wait_for(long_timeout_timer) do - # This condition will immediately be true and not result in any waiting or dispatching - true - end + workflow.wait_for_any(timeout_timer, long_timeout_timer) raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished? activity_futures = {} echos_completed = 0 total_echos.times do |i| - workflow.wait_for do + workflow.wait_until do workflow.logger.info("Activities in flight #{activity_futures.length}") # Pause workflow until the number of active activity futures is less than 2. This # will throttle new activities from being started, guaranteeing that only two of these @@ -66,7 +52,7 @@ def execute(total_echos, max_echos_at_once, expected_signal) end end - workflow.wait_for do + workflow.wait_until do workflow.logger.info("Waiting for queue to drain, size: #{activity_futures.length}") activity_futures.empty? end diff --git a/lib/temporal/testing/local_workflow_context.rb b/lib/temporal/testing/local_workflow_context.rb index 31aa4ef0..c6251e53 100644 --- a/lib/temporal/testing/local_workflow_context.rb +++ b/lib/temporal/testing/local_workflow_context.rb @@ -184,14 +184,18 @@ def wait_for_all(*futures) return end - def wait_for(*futures, &unblock_condition) - if futures.empty? && unblock_condition.nil? - raise 'You must pass either a future or an unblock condition block to wait_for' - end + def wait_for_any(*futures) + return if futures.empty? - while (futures.empty? || futures.none?(&:finished?)) && (!unblock_condition || !unblock_condition.call) - Fiber.yield - end + Fiber.yield while futures.none?(&:finished?) + + return + end + + def wait_until(&unblock_condition) + raise 'You must pass an unblock condition block to wait_for' if unblock_condition.nil? + + Fiber.yield until unblock_condition.call return end diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index 7aaa932a..b5d4ffdb 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -141,7 +141,7 @@ def execute_workflow(workflow_class, *input, **args) child_workflow_started = true end - wait_for { child_workflow_started || future.failed? } + wait_until { child_workflow_started || future.failed? } future end @@ -233,60 +233,56 @@ def continue_as_new(*input, **args) completed! end + # Block workflow progress until all futures finish def wait_for_all(*futures) futures.each(&:wait) return end - # Block workflow progress until any future is finished or any unblock_condition - # block evaluates to true. - def wait_for(*futures, &unblock_condition) - if futures.empty? && unblock_condition.nil? - raise 'You must pass either a future or an unblock condition block to wait_for' - end + # Block workflow progress until one of the futures completes. Passing + # in an empty array will immediately unblock. + def wait_for_any(*futures) + return if futures.empty? || futures.any?(&:finished?) fiber = Fiber.current - should_yield = false blocked = true - if futures.any? - if futures.any?(&:finished?) - blocked = false - else - should_yield = true - futures.each do |future| - dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do - if blocked && future.finished? - # Because this block can run for any dispatch, ensure the fiber is only - # resumed one time by checking if it's already been unblocked. - blocked = false - fiber.resume - end - end + futures.each do |future| + dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do + # Because any of the futures can resume the fiber, ignore any callbacks + # from other futures after unblocking has occurred + if blocked && future.finished? + blocked = false + fiber.resume end end end - if blocked && unblock_condition - if unblock_condition.call + Fiber.yield + + return + end + + # Block workflow progress until the specified block evaluates to true. + def wait_until(&unblock_condition) + raise 'You must pass a block to wait_until' if unblock_condition.nil? + + return if unblock_condition.call + + fiber = Fiber.current + blocked = true + + dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do + # Because this block can run for any dispatch, ensure the fiber is only + # resumed one time by checking if it's already been unblocked. + if blocked && unblock_condition.call blocked = false - should_yield = false - else - should_yield = true - - dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do - # Because this block can run for any dispatch, ensure the fiber is only - # resumed one time by checking if it's already been unblocked. - if blocked && unblock_condition.call - blocked = false - fiber.resume - end - end + fiber.resume end end - Fiber.yield if should_yield + Fiber.yield return end diff --git a/lib/temporal/workflow/future.rb b/lib/temporal/workflow/future.rb index 550a038b..26929e86 100644 --- a/lib/temporal/workflow/future.rb +++ b/lib/temporal/workflow/future.rb @@ -31,7 +31,7 @@ def failed? def wait return if finished? - context.wait_for(self) + context.wait_for_any(self) end def get diff --git a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb index 6242be7d..66c68769 100644 --- a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb +++ b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb @@ -167,7 +167,7 @@ def execute can_continue = false exited = false fiber = Fiber.new do - workflow_context.wait_for do + workflow_context.wait_until do can_continue end @@ -188,7 +188,7 @@ def execute future = workflow_context.execute_activity(TestAsyncActivity) fiber = Fiber.new do - workflow_context.wait_for(future) do + workflow_context.wait_for_any(future) do false end @@ -212,7 +212,7 @@ def execute future.wait fiber = Fiber.new do - workflow_context.wait_for(future, async_future) + workflow_context.wait_for_any(future, async_future) exited = true end diff --git a/spec/unit/lib/temporal/workflow/context_spec.rb b/spec/unit/lib/temporal/workflow/context_spec.rb index a6dd2921..75731d4c 100644 --- a/spec/unit/lib/temporal/workflow/context_spec.rb +++ b/spec/unit/lib/temporal/workflow/context_spec.rb @@ -1,12 +1,14 @@ require 'temporal/workflow' require 'temporal/workflow/context' +require 'temporal/workflow/dispatcher' +require 'temporal/workflow/future' require 'time' class MyTestWorkflow < Temporal::Workflow; end describe Temporal::Workflow::Context do let(:state_manager) { instance_double('Temporal::Workflow::StateManager') } - let(:dispatcher) { instance_double('Temporal::Workflow::Dispatcher') } + let(:dispatcher) { Temporal::Workflow::Dispatcher.new } let(:query_registry) { instance_double('Temporal::Workflow::QueryRegistry') } let(:metadata) { instance_double('Temporal::Metadata::Workflow') } let(:workflow_context) do @@ -69,4 +71,147 @@ class MyTestWorkflow < Temporal::Workflow; end ).to eq({ 'CustomDatetimeField' => time.utc.iso8601 }) end end + + describe '#wait_for_all' do + let(:target_1) { 'target1' } + let(:future_1) { Temporal::Workflow::Future.new(target_1, workflow_context) } + let(:target_2) { 'target2' } + let(:future_2) { Temporal::Workflow::Future.new(target_2, workflow_context) } + + def wait_for_all + unblocked = false + + Fiber.new do + workflow_context.wait_for_all(future_1, future_2) + unblocked = true + end.resume + + proc { unblocked } + end + + it 'no futures returns immediately' do + workflow_context.wait_for_all + end + + it 'futures already finished' do + future_1.set('done') + future_2.set('also done') + check_unblocked = wait_for_all + + expect(check_unblocked.call).to be(true) + end + + it 'futures finished' do + check_unblocked = wait_for_all + + future_1.set('done') + dispatcher.dispatch(target_1, 'foo') + expect(check_unblocked.call).to be(false) + + future_2.set('also done') + dispatcher.dispatch(target_2, 'foo') + expect(check_unblocked.call).to be(true) + end + end + + describe '#wait_for_any' do + let(:target_1) { 'target1' } + let(:future_1) { Temporal::Workflow::Future.new(target_1, workflow_context) } + let(:target_2) { 'target2' } + let(:future_2) { Temporal::Workflow::Future.new(target_2, workflow_context) } + + def wait_for_any + unblocked = false + + Fiber.new do + workflow_context.wait_for_any(future_1, future_2) + unblocked = true + end.resume + + proc { unblocked } + end + + it 'no futures returns immediately' do + workflow_context.wait_for_any + end + + it 'one future already finished' do + future_1.set("it's done") + check_unblocked = wait_for_any + + expect(check_unblocked.call).to be(true) + end + + it 'one future becomes finished' do + check_unblocked = wait_for_any + future_1.set("it's done") + dispatcher.dispatch(target_1, 'foo') + + expect(check_unblocked.call).to be(true) + + # Dispatch a second time. This should not attempt to + # resume the fiber which by now should already be dead. + dispatcher.dispatch(target_1, 'foo') + end + + it 'both futures becomes finished' do + check_unblocked = wait_for_any + future_1.set("it's done") + future_2.set("it's done") + dispatcher.dispatch(target_1, 'foo') + dispatcher.dispatch(target_2, 'foo') + + expect(check_unblocked.call).to be(true) + end + + it 'one future dispatched but not finished' do + check_unblocked = wait_for_any + dispatcher.dispatch(target_1, 'foo') + + expect(check_unblocked.call).to be(false) + end + end + + describe '#wait_until' do + def wait_until(&blk) + unblocked = false + + Fiber.new do + workflow_context.wait_until(&blk) + unblocked = true + end.resume + + proc { unblocked } + end + + it 'block already true' do + check_unblocked = wait_until { true } + + expect(check_unblocked.call).to be(true) + end + + it 'block is always false' do + check_unblocked = wait_until { false } + + dispatcher.dispatch('target', 'foo') + expect(check_unblocked.call).to be(false) + end + + it 'block becomes true' do + value = false + check_unblocked = wait_until { value } + + expect(check_unblocked.call).to be(false) + + dispatcher.dispatch('target', 'foo') + expect(check_unblocked.call).to be(false) + + value = true + dispatcher.dispatch('target', 'foo') + expect(check_unblocked.call).to be(true) + + # Can dispatch again safely without resuming dead fiber + dispatcher.dispatch('target', 'foo') + end + end end diff --git a/spec/unit/lib/temporal/workflow/future_spec.rb b/spec/unit/lib/temporal/workflow/future_spec.rb index 4fbc5b37..293a7d84 100644 --- a/spec/unit/lib/temporal/workflow/future_spec.rb +++ b/spec/unit/lib/temporal/workflow/future_spec.rb @@ -46,8 +46,8 @@ expect(subject.get).to be exception end - it 'calls context.wait_for if not finished' do - allow(workflow_context).to receive(:wait_for).with(subject) + it 'calls context.wait_for_any if not finished' do + allow(workflow_context).to receive(:wait_for_any).with(subject) subject.get end end @@ -58,8 +58,8 @@ subject.wait end - it 'calls context.wait_for if not already done' do - allow(workflow_context).to receive(:wait_for).with(subject) + it 'calls context.wait_for_any if not already done' do + allow(workflow_context).to receive(:wait_for_any).with(subject) subject.wait end end