Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ worker.register_workflow(SideEffectWorkflow)
worker.register_workflow(SimpleTimerWorkflow)
worker.register_workflow(TimeoutWorkflow)
worker.register_workflow(TripBookingWorkflow)
worker.register_workflow(WaitForWorkflow)

worker.register_activity(AsyncActivity)
worker.register_activity(EchoActivity)
Expand Down
28 changes: 28 additions & 0 deletions examples/spec/integration/wait_for_workflow_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
require 'workflows/wait_for_workflow'

describe WaitForWorkflow do

it 'signals at workflow start time' do
workflow_id = SecureRandom.uuid
run_id = Temporal.start_workflow(
WaitForWorkflow,
10, # number of echo activities to run
2, # max activity parallelism
'signal_name',
options: { workflow_id: workflow_id }
)

Temporal.signal_workflow(WaitForWorkflow, 'signal_name', workflow_id, run_id)

result = Temporal.await_workflow_result(
WaitForWorkflow,
workflow_id: workflow_id,
run_id: run_id,
)

expect(result.length).to eq(3)
expect(result[:signal]).to eq(true)
expect(result[:timer]).to eq(true)
expect(result[:activity]).to eq(true)
end
end
80 changes: 80 additions & 0 deletions examples/workflows/wait_for_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
require 'activities/echo_activity'
require 'activities/long_running_activity'

# This example workflow exercises all three conditions that can change state that is being
# awaited upon: activity completion, sleep completion, signal receieved.
class WaitForWorkflow < Temporal::Workflow
def execute(total_echos, max_echos_at_once, expected_signal)
signals_received = {}

workflow.on_signal do |signal, input|
signals_received[signal] = input
end

workflow.wait_for do
workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}")
signals_received.key?(expected_signal)
end

# Run an activity but with a max time limit by starting a timer. This activity
# will not complete before the timer, which may result in a failed activity task after the
# workflow is completed.
long_running_future = LongRunningActivity.execute(15, 0.1)
timeout_timer = workflow.start_timer(1)
workflow.wait_for(timeout_timer, long_running_future)

timer_beat_activity = timeout_timer.finished? && !long_running_future.finished?

# This should not wait further. The first future has already finished, and therefore
# the second one should not be awaited upon.
long_timeout_timer = workflow.start_timer(15)
workflow.wait_for(timeout_timer, long_timeout_timer)
raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished?

block_called = false
workflow.wait_for(timeout_timer) do
# This should never be called because the timeout_timer future was already
# finished before the wait was even called.
block_called = true
end
raise 'Block should not have been called' if block_called

workflow.wait_for(long_timeout_timer) do
# This condition will immediately be true and not result in any waiting or dispatching
true
end
raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished?

activity_futures = {}
echos_completed = 0

total_echos.times do |i|
workflow.wait_for do
workflow.logger.info("Activities in flight #{activity_futures.length}")
# Pause workflow until the number of active activity futures is less than 2. This
# will throttle new activities from being started, guaranteeing that only two of these
# activities are running at once.
activity_futures.length < max_echos_at_once
end

future = EchoActivity.execute("hi #{i}")
activity_futures[i] = future

future.done do
activity_futures.delete(i)
echos_completed += 1
end
end

workflow.wait_for do
workflow.logger.info("Waiting for queue to drain, size: #{activity_futures.length}")
activity_futures.empty?
end

{
signal: signals_received.key?(expected_signal),
timer: timer_beat_activity,
activity: echos_completed == total_echos
}
end
end
13 changes: 10 additions & 3 deletions lib/temporal/testing/local_workflow_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,16 @@ def wait_for_all(*futures)
return
end

def wait_for(future)
# Point of communication
Fiber.yield while !future.finished?
def wait_for(*futures, &unblock_condition)
if futures.empty? && unblock_condition.nil?
raise 'You must pass either a future or an unblock condition block to wait_for'
end

while (futures.empty? || futures.none?(&:finished?)) && (!unblock_condition || !unblock_condition.call)
Fiber.yield
end

return
end

def now
Expand Down
48 changes: 44 additions & 4 deletions lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,54 @@ def wait_for_all(*futures)
return
end

def wait_for(future)
# Block workflow progress until any future is finished or any unblock_condition
# block evaluates to true.
def wait_for(*futures, &unblock_condition)
if futures.empty? && unblock_condition.nil?
raise 'You must pass either a future or an unblock condition block to wait_for'
end

fiber = Fiber.current
should_yield = false
blocked = true

if futures.any?
if futures.any?(&:finished?)
blocked = false
else
should_yield = true
futures.each do |future|
dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we actually need to separate between the futures and unblock condition cases here? There are probably arguments to both approaches:

  1. Only using a single wildcard handler for both cases means that it might get called more than needed, but it simplifies the code (only need to register one handler)
  2. Using future-specific and a wildcard handlers is a bit of an optimisation for when only futures are passed, but is actually less optimal when both are passed

I wonder what do you think about this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this code path is used when invoking activities synchronously or sleeps, which are almost certainly the most common cases. I could register separate handlers here, rather than handling the separate cases inside the dispatcher. I'll give this alternative a try to see how it looks once implemented, especially with the unregistration in place.

Copy link
Contributor

@antstorm antstorm Nov 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good 👍 But also happy to rearrange things in later PRs to keep this one contained and ship the feature. As long as public-facing APIs are stable, which I believe they are

if blocked && future.finished?
# Because this block can run for any dispatch, ensure the fiber is only
# resumed one time by checking if it's already been unblocked.
blocked = false
fiber.resume
end
end
end
end
end

dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
fiber.resume if future.finished?
if blocked && unblock_condition
if unblock_condition.call
blocked = false
should_yield = false
else
should_yield = true

dispatcher.register_handler(Dispatcher::WILDCARD, Dispatcher::WILDCARD) do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a slight problem here — we're not de-registering the handler afterwards, which means that it will be called for every event until the end of a workflow. It won't be doing anything because of that blocked variable, but something we might want to address. It wasn't an issue previously because each target was expected to fire each event only once, but still keeps these references in memory.

This can be done in a follow-up, I might have a look at it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose once it's unblocked, it can be unregistered. The interplay on this is messy because it needs a callback. I'll see if I can do this cleanly or if it's best to revise later.

# Because this block can run for any dispatch, ensure the fiber is only
# resumed one time by checking if it's already been unblocked.
if blocked && unblock_condition.call
blocked = false
fiber.resume
end
end
end
end

Fiber.yield
Fiber.yield if should_yield

return
end
Expand Down
1 change: 1 addition & 0 deletions lib/temporal/workflow/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def dispatch(target, event_name, args = nil)

def handlers_for(target, event_name)
handlers[target]
.concat(handlers[WILDCARD])
.select { |(name, _)| name == event_name || name == WILDCARD }
.map(&:last)
end
Expand Down
65 changes: 62 additions & 3 deletions spec/unit/lib/temporal/testing/local_workflow_context_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,69 @@ def execute
result = workflow_context.execute_activity!(TestActivity)
expect(result).to eq('ok')
end

it 'can heartbeat' do
# Heartbeat doesn't do anything in local mode, but at least it can be called.
workflow_context.execute_activity!(TestHeartbeatingActivity)
end
end

it 'can heartbeat' do
# Heartbeat doesn't do anything in local mode, but at least it can be called.
workflow_context.execute_activity!(TestHeartbeatingActivity)
describe '#wait_for' do
it 'await unblocks once condition changes' do
can_continue = false
exited = false
fiber = Fiber.new do
workflow_context.wait_for do
can_continue
end

exited = true
end

fiber.resume # start running
expect(exited).to eq(false)

can_continue = true # change condition
fiber.resume # resume running after the Fiber.yield done in context.await
expect(exited).to eq(true)
end

it 'condition or future unblocks' do
exited = false

future = workflow_context.execute_activity(TestAsyncActivity)

fiber = Fiber.new do
workflow_context.wait_for(future) do
false
end

exited = true
end

fiber.resume # start running
expect(exited).to eq(false)

execution.complete_activity(async_token, 'async_ok')

fiber.resume # resume running after the Fiber.yield done in context.await
expect(exited).to eq(true)
end

it 'any future unblocks' do
exited = false

async_future = workflow_context.execute_activity(TestAsyncActivity)
future = workflow_context.execute_activity(TestActivity)
future.wait

fiber = Fiber.new do
workflow_context.wait_for(future, async_future)
exited = true
end

fiber.resume # start running
expect(exited).to eq(true)
end
end
end
17 changes: 17 additions & 0 deletions spec/unit/lib/temporal/workflow/dispatcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,24 @@

expect(handler_5).to have_received(:call)
end
end

context 'with WILDCARD target handler' do
let(:handler_6) { -> { 'sixth block' } }
before do
allow(handler_6).to receive(:call)

subject.register_handler(described_class::WILDCARD, described_class::WILDCARD, &handler_6)
end

it 'calls the handler' do
subject.dispatch('target', 'completed')

# Target handlers still invoked
expect(handler_1).to have_received(:call).ordered
expect(handler_4).to have_received(:call).ordered
expect(handler_6).to have_received(:call).ordered
end
end
end
end