Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/workflows/query_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def execute
@last_signal_received = signal
end

workflow.wait_for { last_signal_received == "finish" }
workflow.wait_until { last_signal_received == "finish" }
@state = "finished"

{
Expand Down
2 changes: 1 addition & 1 deletion examples/workflows/wait_for_external_signal_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def execute(expected_signal)
signal_counts[signal] += 1
end

workflow.wait_for do
workflow.wait_until do
workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}")
signals_received.key?(expected_signal)
end
Expand Down
2 changes: 1 addition & 1 deletion examples/workflows/wait_for_named_signal_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def execute(expected_signal)
end

timeout_timer = workflow.start_timer(1)
workflow.wait_for(timeout_timer)
workflow.wait_for_any(timeout_timer)

{ received: signals_received, counts: signal_counts }
end
Expand Down
24 changes: 5 additions & 19 deletions examples/workflows/wait_for_workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def execute(total_echos, max_echos_at_once, expected_signal)
signals_received[signal] = input
end

workflow.wait_for do
workflow.wait_until do
workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}")
signals_received.key?(expected_signal)
end
Expand All @@ -21,35 +21,21 @@ def execute(total_echos, max_echos_at_once, expected_signal)
# workflow is completed.
long_running_future = LongRunningActivity.execute(15, 0.1)
timeout_timer = workflow.start_timer(1)
workflow.wait_for(timeout_timer, long_running_future)
workflow.wait_for_any(timeout_timer, long_running_future)

timer_beat_activity = timeout_timer.finished? && !long_running_future.finished?

# This should not wait further. The first future has already finished, and therefore
# the second one should not be awaited upon.
long_timeout_timer = workflow.start_timer(15)
workflow.wait_for(timeout_timer, long_timeout_timer)
raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished?

block_called = false
workflow.wait_for(timeout_timer) do
# This should never be called because the timeout_timer future was already
# finished before the wait was even called.
block_called = true
end
raise 'Block should not have been called' if block_called

workflow.wait_for(long_timeout_timer) do
# This condition will immediately be true and not result in any waiting or dispatching
true
end
workflow.wait_for_any(timeout_timer, long_timeout_timer)
raise 'The workflow should not have waited for this timer to complete' if long_timeout_timer.finished?

activity_futures = {}
echos_completed = 0

total_echos.times do |i|
workflow.wait_for do
workflow.wait_until do
workflow.logger.info("Activities in flight #{activity_futures.length}")
# Pause workflow until the number of active activity futures is less than 2. This
# will throttle new activities from being started, guaranteeing that only two of these
Expand All @@ -66,7 +52,7 @@ def execute(total_echos, max_echos_at_once, expected_signal)
end
end

workflow.wait_for do
workflow.wait_until do
workflow.logger.info("Waiting for queue to drain, size: #{activity_futures.length}")
activity_futures.empty?
end
Expand Down
18 changes: 11 additions & 7 deletions lib/temporal/testing/local_workflow_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,18 @@ def wait_for_all(*futures)
return
end

def wait_for(*futures, &unblock_condition)
if futures.empty? && unblock_condition.nil?
raise 'You must pass either a future or an unblock condition block to wait_for'
end
def wait_for_any(*futures)
return if futures.empty?

while (futures.empty? || futures.none?(&:finished?)) && (!unblock_condition || !unblock_condition.call)
Fiber.yield
end
Fiber.yield while futures.none?(&:finished?)

return
end

def wait_until(&unblock_condition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change makes sense and clarifies a lot of things, thank you @jeffschoner-stripe . One thing I'm not sure — why not use the original wait_for here? My brain thinks that wait_until suggests a time argument, but it might be just me :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had originally called this wait_for_condition but the until language seemed more similar to the Ruby until keyword.

I didn't want to preserve wait_for because if someone is using that with a block today, removing the block from the signature is not breaking. Their code would just start silently ignoring the block condition. For example, if they were calling wait_for(timeout_timer_future) { some_variable }, removing the block argument from the function would have the effect of just waiting for the timeout and ignoring the condition.

However, if wait_for goes away entirely, it will immediately break their code once upgrading to the new version. I could also do leave the wait_for there but mark it as deprecated or leave it there to only raise an error explaining that the new functions should be used instead. What do you think? Unfortunately, I don't have a clear picture of who may be using these APIs today.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose I could also keep calling it wait_for and have it only take the condition block. That would be breaking, and I could add an explanatory comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think approach here is clearer. It seems better to have the method completely go away, so that any breakage would be resolved by looking for this PR or by reading comments on workflow context.

raise 'You must pass an unblock condition block to wait_for' if unblock_condition.nil?

Fiber.yield until unblock_condition.call

return
end
Expand Down
70 changes: 33 additions & 37 deletions lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def execute_workflow(workflow_class, *input, **args)
child_workflow_started = true
end

wait_for { child_workflow_started || future.failed? }
wait_until { child_workflow_started || future.failed? }

future
end
Expand Down Expand Up @@ -233,60 +233,56 @@ def continue_as_new(*input, **args)
completed!
end

# Block workflow progress until all futures finish
def wait_for_all(*futures)
futures.each(&:wait)

return
end

# Block workflow progress until any future is finished or any unblock_condition
# block evaluates to true.
def wait_for(*futures, &unblock_condition)
if futures.empty? && unblock_condition.nil?
raise 'You must pass either a future or an unblock condition block to wait_for'
end
# Block workflow progress until one of the futures completes. Passing
# in an empty array will immediately unblock.
def wait_for_any(*futures)
return if futures.empty? || futures.any?(&:finished?)

fiber = Fiber.current
should_yield = false
blocked = true

if futures.any?
if futures.any?(&:finished?)
blocked = false
else
should_yield = true
futures.each do |future|
dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
if blocked && future.finished?
# Because this block can run for any dispatch, ensure the fiber is only
# resumed one time by checking if it's already been unblocked.
blocked = false
fiber.resume
end
end
futures.each do |future|
dispatcher.register_handler(future.target, Dispatcher::WILDCARD) do
# Because any of the futures can resume the fiber, ignore any callbacks
# from other futures after unblocking has occurred
if blocked && future.finished?
blocked = false
fiber.resume
end
end
end

if blocked && unblock_condition
if unblock_condition.call
Fiber.yield

return
end

# Block workflow progress until the specified block evaluates to true.
def wait_until(&unblock_condition)
raise 'You must pass a block to wait_until' if unblock_condition.nil?

return if unblock_condition.call

fiber = Fiber.current
blocked = true

dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
# Because this block can run for any dispatch, ensure the fiber is only
# resumed one time by checking if it's already been unblocked.
if blocked && unblock_condition.call
blocked = false
should_yield = false
else
should_yield = true

dispatcher.register_handler(Dispatcher::TARGET_WILDCARD, Dispatcher::WILDCARD) do
# Because this block can run for any dispatch, ensure the fiber is only
# resumed one time by checking if it's already been unblocked.
if blocked && unblock_condition.call
blocked = false
fiber.resume
end
end
fiber.resume
end
end

Fiber.yield if should_yield
Fiber.yield

return
end
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/workflow/future.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def failed?

def wait
return if finished?
context.wait_for(self)
context.wait_for_any(self)
end

def get
Expand Down
6 changes: 3 additions & 3 deletions spec/unit/lib/temporal/testing/local_workflow_context_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def execute
can_continue = false
exited = false
fiber = Fiber.new do
workflow_context.wait_for do
workflow_context.wait_until do
can_continue
end

Expand All @@ -188,7 +188,7 @@ def execute
future = workflow_context.execute_activity(TestAsyncActivity)

fiber = Fiber.new do
workflow_context.wait_for(future) do
workflow_context.wait_for_any(future) do
false
end

Expand All @@ -212,7 +212,7 @@ def execute
future.wait

fiber = Fiber.new do
workflow_context.wait_for(future, async_future)
workflow_context.wait_for_any(future, async_future)
exited = true
end

Expand Down
Loading