Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
fb5e2dc
initial work to support SignalExternalWorkflow
chuckremes2 Feb 4, 2022
45fc693
define the serializer and hook it up
chuckremes2 Feb 4, 2022
df7f9be
stub in what I think is the correct work for each event type
chuckremes2 Feb 4, 2022
3b85c5c
some fixes per antstorm advice
chuckremes2 Feb 7, 2022
3ec41f7
initial attempt at integration test
chuckremes2 Feb 7, 2022
2b7cf11
docs on testing and an improvement to existing test
chuckremes2 Feb 9, 2022
4e7e8a3
encode the signal payload using correct helper
chuckremes2 Feb 9, 2022
9971c20
return a Future and fulfill it correctly upon completion
chuckremes2 Feb 9, 2022
82240ac
get the \*event_id from the right field in the command structure
chuckremes2 Feb 9, 2022
c274b68
modify test to verify the signal is only received once
chuckremes2 Feb 9, 2022
f65ce32
test for failure to deliver a signal to external workflow
chuckremes2 Feb 9, 2022
6f16c73
do not discard the failure command otherwise non-deterministic
chuckremes2 Feb 9, 2022
fe3aa4d
simplify test workflow by eliminating unnecessary timer
chuckremes2 Feb 9, 2022
820dfcc
oops, had double call to #schedule_command so signals were sent twice
chuckremes2 Feb 10, 2022
fd85b4b
edit description of example
chuckremes2 Feb 10, 2022
f53b947
split to separate files and improve test coverage
chuckremes2 Feb 11, 2022
e43066e
change method signature for consistency and a few other cleanups
chuckremes2 Feb 11, 2022
b888fe3
oops, fix EventType name to match correct constant
chuckremes2 Feb 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,33 @@ To try these out you need to have a running Temporal service ([setup instruction
Install all the gem dependencies by running:

```sh
> bundle install
bundle install
```

Modify the `init.rb` file to point to your Temporal cluster.

Start a worker process:

```sh
> bin/worker
bin/worker
```

Use this command to trigger one of the example workflows from the `workflows` directory:

```sh
> bin/trigger NAME_OF_THE_WORKFLOW [argument_1, argument_2, ...]
bin/trigger NAME_OF_THE_WORKFLOW [argument_1, argument_2, ...]
```
## Testing

To run tests, make sure the temporal server and the worker process are already running:
```shell
docker-compose up
bin/worker
```
To execute the tests, run:
```shell
bundle exec rspec
```
To add a new test that uses a new workflow or new activity, make sure to register those new
workflows and activities by modifying the `bin/worker` file and adding them there. After any
changes to that file, restart the worker process to pick up the new registrations.
2 changes: 2 additions & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ worker.register_workflow(SimpleTimerWorkflow)
worker.register_workflow(TimeoutWorkflow)
worker.register_workflow(TripBookingWorkflow)
worker.register_workflow(WaitForWorkflow)
worker.register_workflow(WaitForExternalSignalWorkflow)
worker.register_workflow(SendSignalToExternalWorkflow)

worker.register_activity(AsyncActivity)
worker.register_activity(EchoActivity)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
require 'workflows/wait_for_external_signal_workflow'
require 'workflows/send_signal_to_external_workflow'

describe WaitForExternalSignalWorkflow do
let(:signal_name) { "signal_name" }
let(:receiver_workflow_id) { SecureRandom.uuid }
let(:sender_workflow_id) { SecureRandom.uuid }

context 'when the workflows succeed then' do
it 'receives signal from an external workflow only once' do
run_id = Temporal.start_workflow(
WaitForExternalSignalWorkflow,
signal_name,
options: {workflow_id: receiver_workflow_id}
)

Temporal.start_workflow(
SendSignalToExternalWorkflow,
signal_name,
receiver_workflow_id
)

result = Temporal.await_workflow_result(
WaitForExternalSignalWorkflow,
workflow_id: receiver_workflow_id,
run_id: run_id,
)

expect(result).to eq(
{
received: {
signal_name => ["arg1", "arg2"]
},
counts: {
signal_name => 1
}
}
)
end

it 'returns :success to the sending workflow' do
Temporal.start_workflow(
WaitForExternalSignalWorkflow,
signal_name,
options: {workflow_id: receiver_workflow_id}
)

run_id = Temporal.start_workflow(
SendSignalToExternalWorkflow,
signal_name,
receiver_workflow_id,
options: {workflow_id: sender_workflow_id}
)

result = Temporal.await_workflow_result(
SendSignalToExternalWorkflow,
workflow_id: sender_workflow_id,
run_id: run_id,
)

expect(result).to eq(:success)
end
end

context 'when the workflows fail' do
it 'correctly handles failure to deliver' do
run_id = Temporal.start_workflow(
SendSignalToExternalWorkflow,
signal_name,
receiver_workflow_id,
options: {workflow_id: sender_workflow_id})

result = Temporal.await_workflow_result(
SendSignalToExternalWorkflow,
workflow_id: sender_workflow_id,
run_id: run_id,
)

expect(result).to eq(:failed)
end
end
end
17 changes: 17 additions & 0 deletions examples/workflows/send_signal_to_external_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Sends +signal_name+ to the +target_workflow+ from within a workflow.
# This is different than using the Client#send_signal method which is
# for signaling a workflow *from outside* any workflow.
#
# Returns :success or :failed
#
class SendSignalToExternalWorkflow < Temporal::Workflow
def execute(signal_name, target_workflow)
logger.info("Send a signal to an external workflow")
future = workflow.signal_external_workflow(WaitForExternalSignalWorkflow, signal_name, target_workflow, nil, ["arg1", "arg2"])
@status = nil
future.done { @status = :success }
future.failed { @status = :failed }
future.get
@status
end
end
22 changes: 22 additions & 0 deletions examples/workflows/wait_for_external_signal_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# One workflow sends a signal to another workflow. Can be used to implement
# the synchronous-proxy pattern (see Go samples)
#
class WaitForExternalSignalWorkflow < Temporal::Workflow
def execute(expected_signal)
signals_received = {}
signal_counts = Hash.new { |h,k| h[k] = 0 }

workflow.on_signal do |signal, input|
workflow.logger.info("Received signal name #{signal}, with input #{input.inspect}")
signals_received[signal] = input
signal_counts[signal] += 1
end

workflow.wait_for do
workflow.logger.info("Awaiting #{expected_signal}, signals received so far: #{signals_received}")
signals_received.key?(expected_signal)
end

{ received: signals_received, counts: signal_counts }
end
end
4 changes: 3 additions & 1 deletion lib/temporal/connection/serializer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require 'temporal/connection/serializer/complete_workflow'
require 'temporal/connection/serializer/continue_as_new'
require 'temporal/connection/serializer/fail_workflow'
require 'temporal/connection/serializer/signal_external_workflow'

module Temporal
module Connection
Expand All @@ -21,7 +22,8 @@ module Serializer
Workflow::Command::CancelTimer => Serializer::CancelTimer,
Workflow::Command::CompleteWorkflow => Serializer::CompleteWorkflow,
Workflow::Command::ContinueAsNew => Serializer::ContinueAsNew,
Workflow::Command::FailWorkflow => Serializer::FailWorkflow
Workflow::Command::FailWorkflow => Serializer::FailWorkflow,
Workflow::Command::SignalExternalWorkflow => Serializer::SignalExternalWorkflow
}.freeze

def self.serialize(object)
Expand Down
33 changes: 33 additions & 0 deletions lib/temporal/connection/serializer/signal_external_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
require 'temporal/connection/serializer/base'
require 'temporal/concerns/payloads'

module Temporal
module Connection
module Serializer
class SignalExternalWorkflow < Base
include Concerns::Payloads

def to_proto
Temporal::Api::Command::V1::Command.new(
command_type: Temporal::Api::Enums::V1::CommandType::COMMAND_TYPE_SIGNAL_EXTERNAL_WORKFLOW_EXECUTION,
signal_external_workflow_execution_command_attributes:
Temporal::Api::Command::V1::SignalExternalWorkflowExecutionCommandAttributes.new(
namespace: object.namespace,
execution: serialize_execution(object.execution),
signal_name: object.signal_name,
input: to_signal_payloads(object.input),
control: "", # deprecated
child_workflow_only: object.child_workflow_only
)
)
end

private

def serialize_execution(execution)
Temporal::Api::Common::V1::WorkflowExecution.new(workflow_id: execution[:workflow_id], run_id: execution[:run_id])
end
end
end
end
end
5 changes: 4 additions & 1 deletion lib/temporal/workflow/command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Command
CancelTimer = Struct.new(:timer_id, keyword_init: true)
CompleteWorkflow = Struct.new(:result, keyword_init: true)
FailWorkflow = Struct.new(:exception, keyword_init: true)
SignalExternalWorkflow = Struct.new(:namespace, :execution, :signal_name, :input, :child_workflow_only, keyword_init: true)

# only these commands are supported right now
SCHEDULE_ACTIVITY_TYPE = :schedule_activity
Expand All @@ -21,6 +22,7 @@ module Command
CANCEL_TIMER_TYPE = :cancel_timer
COMPLETE_WORKFLOW_TYPE = :complete_workflow
FAIL_WORKFLOW_TYPE = :fail_workflow
SIGNAL_EXTERNAL_WORKFLOW_TYPE = :signal_external_workflow

COMMAND_CLASS_MAP = {
SCHEDULE_ACTIVITY_TYPE => ScheduleActivity,
Expand All @@ -30,7 +32,8 @@ module Command
START_TIMER_TYPE => StartTimer,
CANCEL_TIMER_TYPE => CancelTimer,
COMPLETE_WORKFLOW_TYPE => CompleteWorkflow,
FAIL_WORKFLOW_TYPE => FailWorkflow
FAIL_WORKFLOW_TYPE => FailWorkflow,
SIGNAL_EXTERNAL_WORKFLOW_TYPE => SignalExternalWorkflow
}.freeze

def self.generate(type, **args)
Expand Down
46 changes: 46 additions & 0 deletions lib/temporal/workflow/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,52 @@ def cancel(target, cancelation_id)
end
end

# Send a signal from inside a workflow to another workflow. Not to be confused with
# Client#signal_workflow which sends a signal from outside a workflow to a workflow.
#
# @param workflow [Temporal::Workflow, nil] workflow class or nil
# @param signal [String] name of the signal to send
# @param workflow_id [String]
# @param run_id [String]
# @param input [String, Array, nil] optional arguments for the signal
# @param namespace [String, nil] if nil, choose the one declared on the workflow class or the
# global default
# @param child_workflow_only [Boolean] indicates whether the signal should only be delivered to a
# child workflow; defaults to false
#
# @return [Future] future
def signal_external_workflow(workflow, signal, workflow_id, run_id = nil, input = nil, namespace: nil, child_workflow_only: false)
options ||= {}

execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options)

command = Command::SignalExternalWorkflow.new(
namespace: namespace || execution_options.namespace,
execution: {
workflow_id: workflow_id,
run_id: run_id
},
signal_name: signal,
input: input,
child_workflow_only: child_workflow_only
)

target, cancelation_id = schedule_command(command)
future = Future.new(target, self, cancelation_id: cancelation_id)

dispatcher.register_handler(target, 'completed') do |result|
future.set(result)
future.success_callbacks.each { |callback| call_in_fiber(callback, result) }
end

dispatcher.register_handler(target, 'failed') do |exception|
future.fail(exception)
future.failure_callbacks.each { |callback| call_in_fiber(callback, exception) }
end

future
end

private

attr_reader :state_manager, :dispatcher, :workflow_class
Expand Down
4 changes: 1 addition & 3 deletions lib/temporal/workflow/history/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ class Event
ACTIVITY_TASK_CANCELED
TIMER_FIRED
REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_FAILED
SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED
EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED
EXTERNAL_WORKFLOW_EXECUTION_SIGNALED
UPSERT_WORKFLOW_SEARCH_ATTRIBUTES
].freeze

Expand Down Expand Up @@ -48,7 +46,7 @@ def originating_event_id
1 # fixed id for everything related to current workflow
when *EVENT_TYPES
attributes.scheduled_event_id
when *CHILD_WORKFLOW_EVENTS
when *CHILD_WORKFLOW_EVENTS, 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED', 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED'
attributes.initiated_event_id
else
id
Expand Down
19 changes: 16 additions & 3 deletions lib/temporal/workflow/state_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,24 @@ def apply_event(event)
# todo

when 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED'
# todo
# Temporal Server will try to Signal the targeted Workflow
# Contains the Signal name, as well as a Signal payload
# The workflow that sends the signal creates this event in its log; the
# receiving workflow records WORKFLOW_EXECUTION_SIGNALED on reception
state_machine.start
discard_command(target)

when 'SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED'
# todo
# Temporal Server cannot Signal the targeted Workflow
# Usually because the Workflow could not be found
state_machine.fail
dispatch(target, 'failed', 'StandardError', event.attributes.cause)

when 'EXTERNAL_WORKFLOW_EXECUTION_SIGNALED'
# todo
# Temporal Server has successfully Signaled the targeted Workflow
# Return the result to the Future waiting on this
state_machine.complete
dispatch(target, 'completed')

when 'UPSERT_WORKFLOW_SEARCH_ATTRIBUTES'
# todo
Expand All @@ -274,6 +285,8 @@ def event_target_from(command_id, command)
History::EventTarget::WORKFLOW_TYPE
when Command::StartChildWorkflow
History::EventTarget::CHILD_WORKFLOW_TYPE
when Command::SignalExternalWorkflow
History::EventTarget::EXTERNAL_WORKFLOW_TYPE
end

History::EventTarget.new(command_id, target_type)
Expand Down