Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/workflows/wait_for_external_signal_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def execute(expected_signal)
signal_counts[signal] += 1
end

workflow.wait_for do
workflow.wait_until do
workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}")
signals_received.key?(expected_signal)
end
Expand Down
24 changes: 5 additions & 19 deletions examples/workflows/wait_for_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def execute(total_echos, max_echos_at_once, expected_signal)
signals_received[signal] = input
end

workflow.wait_for do
workflow.wait_until do
workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}")
signals_received.key?(expected_signal)
end
Expand All @@ -21,35 +21,21 @@ def execute(total_echos, max_echos_at_once, expected_signal)
# workflow is completed.
long_running_future = LongRunningActivity.execute(15, 0.1)
timeout_timer = workflow.start_timer(1)
workflow.wait_for(timeout_timer, long_running_future)
workflow.wait_for_any(timeout_timer, long_running_future)

timer_beat_activity = timeout_timer.finished? && !long_running_future.finished?

# This should not wait further. The first future has already finished, and therefore
# the second one should not be awaited upon.
long_timeout_timer = workflow.start_timer(15)
workflow.wait_for(timeout_timer, long_timeout_timer)
raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished?

block_called = false
workflow.wait_for(timeout_timer) do
# This should never be called because the timeout_timer future was already
# finished before the wait was even called.
block_called = true
end
raise 'Block should not have been called' if block_called

workflow.wait_for(long_timeout_timer) do
# This condition will immediately be true and not result in any waiting or dispatching
true
end
workflow.wait_for_any(timeout_timer, long_timeout_timer)
raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished?

activity_futures = {}
echos_completed = 0

total_echos.times do |i|
workflow.wait_for do
workflow.wait_until do
workflow.logger.info("Activities in flight #{activity_futures.length}")
# Pause workflow until the number of active activity futures is less than 2. This
# will throttle new activities from being started, guaranteeing that only two of these
Expand All @@ -66,7 +52,7 @@ def execute(total_echos, max_echos_at_once, expected_signal)
end
end

workflow.wait_for do
workflow.wait_until do
workflow.logger.info("Waiting for queue to drain, size: #{activity_futures.length}")
activity_futures.empty?
end
Expand Down
18 changes: 14 additions & 4 deletions lib/temporal/testing/local_workflow_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,22 @@ def wait_for_all(*futures)
return
end

def wait_for(*futures, &unblock_condition)
if futures.empty? && unblock_condition.nil?
raise 'You must pass either a future or an unblock condition block to wait_for'
def wait_for_any(*futures)
return if futures.empty?

while futures.empty? || futures.none?(&:finished?)
Fiber.yield
end

return
end

def wait_until(&unblock_condition)
if unblock_condition.nil?
raise 'You must pass either an unblock condition block to wait_for'
end

while (futures.empty? || futures.none?(&:finished?)) && (!unblock_condition || !unblock_condition.call)
while !unblock_condition || !unblock_condition.call
Fiber.yield
end

Expand Down
72 changes: 35 additions & 37 deletions lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def execute_workflow(workflow_class, *input, **args)
dispatcher.register_handler(target, 'started') do
child_workflow_started = true
end
wait_for { child_workflow_started }
wait_until { child_workflow_started }

future
end
Expand Down Expand Up @@ -228,60 +228,58 @@ def continue_as_new(*input, **args)
completed!
end

# Block workflow progress until all futures finish
def wait_for_all(*futures)
futures.each(&:wait)

return
end

# Block workflow progress until any future is finished or any unblock_condition
# block evaluates to true.
def wait_for(*futures, &unblock_condition)
if futures.empty? && unblock_condition.nil?
raise 'You must pass either a future or an unblock condition block to wait_for'
end
# Block workflow progress until one of the futures completes. Passing
# in an empty array will immediately unblock.
def wait_for_any(*futures)
return if futures.empty? || futures.any?(&:finished?)

fiber = Fiber.current
should_yield = false
blocked = true

if futures.any?
if futures.any?(&:finished?)
blocked = false
else
should_yield = true
futures.each do |future|
dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
if blocked && future.finished?
# Because this block can run for any dispatch, ensure the fiber is only
# resumed one time by checking if it's already been unblocked.
blocked = false
fiber.resume
end
end
futures.each do |future|
id = dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
# Because any of the futures can resume the fiber, ignore any callbacks
# from other futures after unblocking has occurred
if blocked
blocked = false
dispatcher.remove_handler(id, future.target)
fiber.resume
end
end
end

if blocked && unblock_condition
if unblock_condition.call
Fiber.yield

return
end

# Block workflow progress until the specified block evaluates to true.
def wait_until(&unblock_condition)
raise 'You must pass a block to wait_until' if unblock_condition.nil?

return if unblock_condition.call

fiber = Fiber.current
blocked = true

id = dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
# Because this block can run for any dispatch, ensure the fiber is only
# resumed one time by checking if it's already been unblocked.
if blocked && unblock_condition.call
blocked = false
should_yield = false
else
should_yield = true

dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
# Because this block can run for any dispatch, ensure the fiber is only
# resumed one time by checking if it's already been unblocked.
if blocked && unblock_condition.call
blocked = false
fiber.resume
end
end
dispatcher.remove_handler(Dispatcher::TARGET_WILDCARD, id)
fiber.resume
end
end

Fiber.yield if should_yield
Fiber.yield

return
end
Expand Down
12 changes: 10 additions & 2 deletions lib/temporal/workflow/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,17 @@ class Dispatcher

def initialize
@handlers = Hash.new { |hash, key| hash[key] = [] }
@next_id = 0
end

def register_handler(target, event_name, &handler)
handlers[target] << [event_name, handler]
@next_id += 1
handlers[target] << [@next_id, event_name, handler]
@next_id
end

def remove_handler(target, id)
handlers[target] = handlers[target].reject { |(handler_id, _, _)| handler_id == id }
end

def dispatch(target, event_name, args = nil)
Expand All @@ -25,7 +32,8 @@ def dispatch(target, event_name, args = nil)
def handlers_for(target, event_name)
handlers[target]
.concat(handlers[TARGET_WILDCARD])
.select { |(name, _)| name == event_name || name == WILDCARD }
.select { |(_, name, _)| name == event_name || name == WILDCARD }
.sort_by { |sequence, _, _| sequence }
.map(&:last)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/workflow/future.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def failed?

def wait
return if finished?
context.wait_for(self)
context.wait_for_any(self)
end

def get
Expand Down
6 changes: 3 additions & 3 deletions spec/unit/lib/temporal/testing/local_workflow_context_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def execute
can_continue = false
exited = false
fiber = Fiber.new do
workflow_context.wait_for do
workflow_context.wait_until do
can_continue
end

Expand All @@ -167,7 +167,7 @@ def execute
future = workflow_context.execute_activity(TestAsyncActivity)

fiber = Fiber.new do
workflow_context.wait_for(future) do
workflow_context.wait_for_any(future) do
false
end

Expand All @@ -191,7 +191,7 @@ def execute
future.wait

fiber = Fiber.new do
workflow_context.wait_for(future, async_future)
workflow_context.wait_for_any(future, async_future)
exited = true
end

Expand Down
6 changes: 5 additions & 1 deletion spec/unit/lib/temporal/workflow/dispatcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

subject.register_handler(target, 'signaled', &block)

expect(subject.send(:handlers)).to include(target => [['signaled', block]])
expect(subject.send(:handlers)).to include(target => [[1, 'signaled', block]])
end
end

Expand Down Expand Up @@ -70,10 +70,13 @@

context 'with TARGET_WILDCARD target handler' do
let(:handler_6) { -> { 'sixth block' } }
let(:handler_7) { -> { 'seventh block' } }
before do
allow(handler_6).to receive(:call)
allow(handler_7).to receive(:call)

subject.register_handler(described_class::TARGET_WILDCARD, described_class::WILDCARD, &handler_6)
subject.register_handler(target, 'completed', &handler_7)
end

it 'calls the handler' do
Expand All @@ -83,6 +86,7 @@
expect(handler_1).to have_received(:call).ordered
expect(handler_4).to have_received(:call).ordered
expect(handler_6).to have_received(:call).ordered
expect(handler_7).to have_received(:call).ordered
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this change, the sequence here would have been 1, 4, 7, 6 instead because targeted handlers were invoked before wildcard handlers

end

it 'TARGET_WILDCARD can be compared to an EventTarget object' do
Expand Down
8 changes: 4 additions & 4 deletions spec/unit/lib/temporal/workflow/future_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
expect(subject.get).to be exception
end

it 'calls context.wait_for if not finished' do
allow(workflow_context).to receive(:wait_for).with(subject)
it 'calls context.wait_for_any if not finished' do
allow(workflow_context).to receive(:wait_for_any).with(subject)
subject.get
end
end
Expand All @@ -58,8 +58,8 @@
subject.wait
end

it 'calls context.wait_for if not already done' do
allow(workflow_context).to receive(:wait_for).with(subject)
it 'calls context.wait_for_any if not already done' do
allow(workflow_context).to receive(:wait_for_any).with(subject)
subject.wait
end
end
Expand Down