diff --git a/examples/bin/worker b/examples/bin/worker index 65828dfa..b0a7a4ea 100755 --- a/examples/bin/worker +++ b/examples/bin/worker @@ -42,6 +42,7 @@ worker.register_workflow(SideEffectWorkflow) worker.register_workflow(SimpleTimerWorkflow) worker.register_workflow(TimeoutWorkflow) worker.register_workflow(TripBookingWorkflow) +worker.register_workflow(WaitForWorkflow) worker.register_activity(AsyncActivity) worker.register_activity(EchoActivity) diff --git a/examples/spec/integration/wait_for_workflow_spec.rb b/examples/spec/integration/wait_for_workflow_spec.rb new file mode 100644 index 00000000..d5feeee6 --- /dev/null +++ b/examples/spec/integration/wait_for_workflow_spec.rb @@ -0,0 +1,28 @@ +require 'workflows/wait_for_workflow' + +describe WaitForWorkflow do + + it 'signals at workflow start time' do + workflow_id = SecureRandom.uuid + run_id = Temporal.start_workflow( + WaitForWorkflow, + 10, # number of echo activities to run + 2, # max activity parallelism + 'signal_name', + options: { workflow_id: workflow_id } + ) + + Temporal.signal_workflow(WaitForWorkflow, 'signal_name', workflow_id, run_id) + + result = Temporal.await_workflow_result( + WaitForWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + + expect(result.length).to eq(3) + expect(result[:signal]).to eq(true) + expect(result[:timer]).to eq(true) + expect(result[:activity]).to eq(true) + end +end \ No newline at end of file diff --git a/examples/workflows/wait_for_workflow.rb b/examples/workflows/wait_for_workflow.rb new file mode 100644 index 00000000..226ac9c4 --- /dev/null +++ b/examples/workflows/wait_for_workflow.rb @@ -0,0 +1,80 @@ +require 'activities/echo_activity' +require 'activities/long_running_activity' + +# This example workflow exercises all three conditions that can change state that is being +# awaited upon: activity completion, sleep completion, signal receieved. +class WaitForWorkflow < Temporal::Workflow + def execute(total_echos, max_echos_at_once, expected_signal) + signals_received = {} + + workflow.on_signal do |signal, input| + signals_received[signal] = input + end + + workflow.wait_for do + workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}") + signals_received.key?(expected_signal) + end + + # Run an activity but with a max time limit by starting a timer. This activity + # will not complete before the timer, which may result in a failed activity task after the + # workflow is completed. + long_running_future = LongRunningActivity.execute(15, 0.1) + timeout_timer = workflow.start_timer(1) + workflow.wait_for(timeout_timer, long_running_future) + + timer_beat_activity = timeout_timer.finished? && !long_running_future.finished? + + # This should not wait further. The first future has already finished, and therefore + # the second one should not be awaited upon. + long_timeout_timer = workflow.start_timer(15) + workflow.wait_for(timeout_timer, long_timeout_timer) + raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished? + + block_called = false + workflow.wait_for(timeout_timer) do + # This should never be called because the timeout_timer future was already + # finished before the wait was even called. + block_called = true + end + raise 'Block should not have been called' if block_called + + workflow.wait_for(long_timeout_timer) do + # This condition will immediately be true and not result in any waiting or dispatching + true + end + raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished? + + activity_futures = {} + echos_completed = 0 + + total_echos.times do |i| + workflow.wait_for do + workflow.logger.info("Activities in flight #{activity_futures.length}") + # Pause workflow until the number of active activity futures is less than 2. This + # will throttle new activities from being started, guaranteeing that only two of these + # activities are running at once. + activity_futures.length < max_echos_at_once + end + + future = EchoActivity.execute("hi #{i}") + activity_futures[i] = future + + future.done do + activity_futures.delete(i) + echos_completed += 1 + end + end + + workflow.wait_for do + workflow.logger.info("Waiting for queue to drain, size: #{activity_futures.length}") + activity_futures.empty? + end + + { + signal: signals_received.key?(expected_signal), + timer: timer_beat_activity, + activity: echos_completed == total_echos + } + end +end diff --git a/lib/temporal/testing/local_workflow_context.rb b/lib/temporal/testing/local_workflow_context.rb index f9422af8..94e9bdd1 100644 --- a/lib/temporal/testing/local_workflow_context.rb +++ b/lib/temporal/testing/local_workflow_context.rb @@ -165,9 +165,16 @@ def wait_for_all(*futures) return end - def wait_for(future) - # Point of communication - Fiber.yield while !future.finished? + def wait_for(*futures, &unblock_condition) + if futures.empty? && unblock_condition.nil? + raise 'You must pass either a future or an unblock condition block to wait_for' + end + + while (futures.empty? || futures.none?(&:finished?)) && (!unblock_condition || !unblock_condition.call) + Fiber.yield + end + + return end def now diff --git a/lib/temporal/workflow/context.rb b/lib/temporal/workflow/context.rb index 709211c8..4b8e8cc6 100644 --- a/lib/temporal/workflow/context.rb +++ b/lib/temporal/workflow/context.rb @@ -215,14 +215,54 @@ def wait_for_all(*futures) return end - def wait_for(future) + # Block workflow progress until any future is finished or any unblock_condition + # block evaluates to true. + def wait_for(*futures, &unblock_condition) + if futures.empty? && unblock_condition.nil? + raise 'You must pass either a future or an unblock condition block to wait_for' + end + fiber = Fiber.current + should_yield = false + blocked = true + + if futures.any? + if futures.any?(&:finished?) + blocked = false + else + should_yield = true + futures.each do |future| + dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do + if blocked && future.finished? + # Because this block can run for any dispatch, ensure the fiber is only + # resumed one time by checking if it's already been unblocked. + blocked = false + fiber.resume + end + end + end + end + end - dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do - fiber.resume if future.finished? + if blocked && unblock_condition + if unblock_condition.call + blocked = false + should_yield = false + else + should_yield = true + + dispatcher.register_handler(Dispatcher::WILDCARD, Dispatcher::WILDCARD) do + # Because this block can run for any dispatch, ensure the fiber is only + # resumed one time by checking if it's already been unblocked. + if blocked && unblock_condition.call + blocked = false + fiber.resume + end + end + end end - Fiber.yield + Fiber.yield if should_yield return end diff --git a/lib/temporal/workflow/dispatcher.rb b/lib/temporal/workflow/dispatcher.rb index 55c581fb..03f11864 100644 --- a/lib/temporal/workflow/dispatcher.rb +++ b/lib/temporal/workflow/dispatcher.rb @@ -23,6 +23,7 @@ def dispatch(target, event_name, args = nil) def handlers_for(target, event_name) handlers[target] + .concat(handlers[WILDCARD]) .select { |(name, _)| name == event_name || name == WILDCARD } .map(&:last) end diff --git a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb index 29b4b2a3..44666cc4 100644 --- a/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb +++ b/spec/unit/lib/temporal/testing/local_workflow_context_spec.rb @@ -120,10 +120,69 @@ def execute result = workflow_context.execute_activity!(TestActivity) expect(result).to eq('ok') end + + it 'can heartbeat' do + # Heartbeat doesn't do anything in local mode, but at least it can be called. + workflow_context.execute_activity!(TestHeartbeatingActivity) + end end - it 'can heartbeat' do - # Heartbeat doesn't do anything in local mode, but at least it can be called. - workflow_context.execute_activity!(TestHeartbeatingActivity) + describe '#wait_for' do + it 'await unblocks once condition changes' do + can_continue = false + exited = false + fiber = Fiber.new do + workflow_context.wait_for do + can_continue + end + + exited = true + end + + fiber.resume # start running + expect(exited).to eq(false) + + can_continue = true # change condition + fiber.resume # resume running after the Fiber.yield done in context.await + expect(exited).to eq(true) + end + + it 'condition or future unblocks' do + exited = false + + future = workflow_context.execute_activity(TestAsyncActivity) + + fiber = Fiber.new do + workflow_context.wait_for(future) do + false + end + + exited = true + end + + fiber.resume # start running + expect(exited).to eq(false) + + execution.complete_activity(async_token, 'async_ok') + + fiber.resume # resume running after the Fiber.yield done in context.await + expect(exited).to eq(true) + end + + it 'any future unblocks' do + exited = false + + async_future = workflow_context.execute_activity(TestAsyncActivity) + future = workflow_context.execute_activity(TestActivity) + future.wait + + fiber = Fiber.new do + workflow_context.wait_for(future, async_future) + exited = true + end + + fiber.resume # start running + expect(exited).to eq(true) + end end end diff --git a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb index d5e008f8..c6a5f493 100644 --- a/spec/unit/lib/temporal/workflow/dispatcher_spec.rb +++ b/spec/unit/lib/temporal/workflow/dispatcher_spec.rb @@ -62,7 +62,24 @@ expect(handler_5).to have_received(:call) end + end + + context 'with WILDCARD target handler' do + let(:handler_6) { -> { 'sixth block' } } + before do + allow(handler_6).to receive(:call) + + subject.register_handler(described_class::WILDCARD, described_class::WILDCARD, &handler_6) + end + it 'calls the handler' do + subject.dispatch('target', 'completed') + + # Target handlers still invoked + expect(handler_1).to have_received(:call).ordered + expect(handler_4).to have_received(:call).ordered + expect(handler_6).to have_received(:call).ordered + end end end end