Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ worker.register_workflow(ReleaseWorkflow)
worker.register_workflow(ResultWorkflow)
worker.register_workflow(SerialHelloWorldWorkflow)
worker.register_workflow(SideEffectWorkflow)
worker.register_workflow(SignalWithStartWorkflow)
worker.register_workflow(SimpleTimerWorkflow)
worker.register_workflow(TimeoutWorkflow)
worker.register_workflow(TripBookingWorkflow)
Expand Down
75 changes: 75 additions & 0 deletions examples/spec/integration/signal_with_start_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
require 'workflows/signal_with_start_workflow'

describe 'signal with start' do

it 'signals at workflow start time' do
workflow_id = SecureRandom.uuid
run_id = Temporal.start_workflow(
SignalWithStartWorkflow,
'signal_name',
0.1,
options: {
workflow_id: workflow_id,
signal_name: 'signal_name',
signal_input: 'expected value',
}
)

result = Temporal.await_workflow_result(
SignalWithStartWorkflow,
workflow_id: workflow_id,
run_id: run_id,
)

expect(result).to eq('expected value') # the workflow should return the signal value
end

it 'signals at workflow start time with name only' do
workflow_id = SecureRandom.uuid
run_id = Temporal.start_workflow(
SignalWithStartWorkflow,
'signal_name',
0.1,
options: {
workflow_id: workflow_id,
signal_name: 'signal_name',
}
)

result = Temporal.await_workflow_result(
SignalWithStartWorkflow,
workflow_id: workflow_id,
run_id: run_id,
)

expect(result).to eq(nil) # the workflow should return the signal value
end

it 'does not launch a new workflow when signaling a running workflow through signal_with_start' do
workflow_id = SecureRandom.uuid
run_id = Temporal.start_workflow(
SignalWithStartWorkflow,
'signal_name',
10,
options: {
workflow_id: workflow_id,
signal_name: 'signal_name',
signal_input: 'expected value',
}
)

second_run_id = Temporal.start_workflow(
SignalWithStartWorkflow,
'signal_name',
0.1,
options: {
workflow_id: workflow_id,
signal_name: 'signal_name',
signal_input: 'expected value',
}
)

# If the run ids are the same, then we didn't start a new workflow
expect(second_run_id).to eq(run_id)
end
end
16 changes: 16 additions & 0 deletions examples/workflows/signal_with_start_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
class SignalWithStartWorkflow < Temporal::Workflow

def execute(expected_signal, sleep_for)
received = 'no signal received'

workflow.on_signal do |signal, input|
if signal == expected_signal
received = input
end
end

# Do something to get descheduled so the signal handler has a chance to run
workflow.sleep(sleep_for)
received
end
end
63 changes: 48 additions & 15 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ def initialize(config)
@config = config
end

# Start a workflow
# Start a workflow with an optional signal
#
# If options[:signal_name] is specified, Temporal will atomically do one of:
# A) start a new workflow and signal it
# B) if workflow_id is specified and the workflow already exists, signal the existing workflow.
#
# @param workflow [Temporal::Workflow, String] workflow class or name. When a workflow class
# is passed, its config (namespace, task_queue, timeouts, etc) will be used
Expand All @@ -25,6 +29,9 @@ def initialize(config)
# @option options [String] :name workflow name
# @option options [String] :namespace
# @option options [String] :task_queue
# @option options [String] :signal_name corresponds to the 'signal' argument to signal_workflow. Required if
# options[:signal_input] is specified.
# @option options [String, Array, nil] :signal_input corresponds to the 'input' argument to signal_workflow
# @option options [Hash] :retry_policy check Temporal::RetryPolicy for available options
# @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS
# @option options [Hash] :headers
Expand All @@ -34,22 +41,44 @@ def start_workflow(workflow, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?

signal_name = options.delete(:signal_name)
signal_input = options.delete(:signal_input)

execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
workflow_id = options[:workflow_id] || SecureRandom.uuid

response = connection.start_workflow_execution(
namespace: execution_options.namespace,
workflow_id: workflow_id,
workflow_name: execution_options.name,
task_queue: execution_options.task_queue,
input: input,
execution_timeout: execution_options.timeouts[:execution],
# If unspecified, individual runs should have the full time for the execution (which includes retries).
run_timeout: execution_options.timeouts[:run] || execution_options.timeouts[:execution],
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
headers: execution_options.headers
)
if signal_name.nil? && signal_input.nil?
response = connection.start_workflow_execution(
namespace: execution_options.namespace,
workflow_id: workflow_id,
workflow_name: execution_options.name,
task_queue: execution_options.task_queue,
input: input,
execution_timeout: execution_options.timeouts[:execution],
# If unspecified, individual runs should have the full time for the execution (which includes retries).
run_timeout: compute_run_timeout(execution_options),
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
headers: execution_options.headers
)
else
raise ArgumentError, 'If signal_input is provided, you must also provide signal_name' if signal_name.nil?

response = connection.signal_with_start_workflow_execution(
namespace: execution_options.namespace,
workflow_id: workflow_id,
workflow_name: execution_options.name,
task_queue: execution_options.task_queue,
input: input,
execution_timeout: execution_options.timeouts[:execution],
run_timeout: compute_run_timeout(execution_options),
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
headers: execution_options.headers,
signal_name: signal_name,
signal_input: signal_input
)
end

response.run_id
end
Expand Down Expand Up @@ -89,7 +118,7 @@ def schedule_workflow(workflow, cron_schedule, *input, **args)
# Execution timeout is across all scheduled jobs, whereas run is for an individual run.
# This default is here for backward compatibility. Certainly, the run timeout shouldn't be higher
# than the execution timeout.
run_timeout: execution_options.timeouts[:run] || execution_options.timeouts[:execution],
run_timeout: compute_run_timeout(execution_options),
task_timeout: execution_options.timeouts[:task],
workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
headers: execution_options.headers,
Expand Down Expand Up @@ -328,6 +357,10 @@ def connection
@connection ||= Temporal::Connection.generate(config.for_connection)
end

def compute_run_timeout(execution_options)
execution_options.timeouts[:run] || execution_options.timeouts[:execution]
end

def find_workflow_task(namespace, workflow_id, run_id, strategy)
history = get_workflow_history(
namespace: namespace,
Expand Down
48 changes: 46 additions & 2 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,52 @@ def signal_workflow_execution(namespace:, workflow_id:, run_id:, signal:, input:
client.signal_workflow_execution(request)
end

def signal_with_start_workflow_execution
raise NotImplementedError
def signal_with_start_workflow_execution(
namespace:,
workflow_id:,
workflow_name:,
task_queue:,
input: nil,
execution_timeout:,
run_timeout:,
task_timeout:,
workflow_id_reuse_policy: nil,
headers: nil,
cron_schedule: nil,
signal_name:,
signal_input:
)
request = Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest.new(
identity: identity,
namespace: namespace,
workflow_type: Temporal::Api::Common::V1::WorkflowType.new(
name: workflow_name
),
workflow_id: workflow_id,
task_queue: Temporal::Api::TaskQueue::V1::TaskQueue.new(
name: task_queue
),
input: to_payloads(input),
workflow_execution_timeout: execution_timeout,
workflow_run_timeout: run_timeout,
workflow_task_timeout: task_timeout,
request_id: SecureRandom.uuid,
header: Temporal::Api::Common::V1::Header.new(
fields: headers
),
cron_schedule: cron_schedule,
signal_name: signal_name,
signal_input: to_signal_payloads(signal_input)
)

if workflow_id_reuse_policy
policy = WORKFLOW_ID_REUSE_POLICY[workflow_id_reuse_policy]
raise Client::ArgumentError, 'Unknown workflow_id_reuse_policy specified' unless policy

request.workflow_id_reuse_policy = policy
end

client.signal_with_start_workflow_execution(request)
end

def reset_workflow_execution(namespace:, workflow_id:, run_id:, reason:, workflow_task_event_id:)
Expand Down
6 changes: 3 additions & 3 deletions lib/temporal/testing/local_workflow_context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ def now
end

def on_signal(&block)
raise NotImplementedError, 'not yet available for testing'
raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on'
end

def cancel_activity(activity_id)
raise NotImplementedError, 'not yet available for testing'
raise NotImplementedError, 'Cancel is not available when Temporal::Testing.local! is on'
end

def cancel(target, cancelation_id)
raise NotImplementedError, 'not yet available for testing'
raise NotImplementedError, 'Cancel is not available when Temporal::Testing.local! is on'
end

private
Expand Down
5 changes: 5 additions & 0 deletions lib/temporal/testing/temporal_override.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ def start_locally(workflow, schedule, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?

# signals aren't supported at all, so let's prohibit start_workflow calls that try to signal
signal_name = options.delete(:signal_name)
signal_input = options.delete(:signal_input)
raise NotImplementedError, 'Signals are not available when Temporal::Testing.local! is on' if signal_name || signal_input

reuse_policy = options[:workflow_id_reuse_policy] || :allow_failed
workflow_id = options[:workflow_id] || SecureRandom.uuid
run_id = SecureRandom.uuid
Expand Down
77 changes: 77 additions & 0 deletions spec/unit/lib/temporal/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,83 @@ class TestStartWorkflow < Temporal::Workflow
end
end

describe '#start_workflow with a signal' do
let(:temporal_response) do
Temporal::Api::WorkflowService::V1::SignalWithStartWorkflowExecutionResponse.new(run_id: 'xxx')
end

before { allow(connection).to receive(:signal_with_start_workflow_execution).and_return(temporal_response) }

def expect_signal_with_start(expected_arguments, expected_signal_argument)
expect(connection)
.to have_received(:signal_with_start_workflow_execution)
.with(
namespace: 'default-test-namespace',
workflow_id: an_instance_of(String),
workflow_name: 'TestStartWorkflow',
task_queue: 'default-test-task-queue',
input: expected_arguments,
task_timeout: Temporal.configuration.timeouts[:task],
run_timeout: Temporal.configuration.timeouts[:run],
execution_timeout: Temporal.configuration.timeouts[:execution],
workflow_id_reuse_policy: nil,
headers: {},
signal_name: 'the question',
signal_input: expected_signal_argument,
)
end

it 'starts a workflow with a signal and no arguments' do
subject.start_workflow(
TestStartWorkflow,
options: { signal_name: 'the question' }
)

expect_signal_with_start([], nil)
end

it 'starts a workflow with a signal and one scalar argument' do
signal_input = 'what do you get if you multiply six by nine?'
subject.start_workflow(
TestStartWorkflow,
42,
options: {
signal_name: 'the question',
signal_input: signal_input,
}
)

expect_signal_with_start([42], signal_input)
end

it 'starts a workflow with a signal and multiple arguments and signal_inputs' do
signal_input = ['what do you get', 'if you multiply six by nine?']
subject.start_workflow(
TestStartWorkflow,
42,
43,
options: {
signal_name: 'the question',
# signals can't have multiple scalar args, but you can pass an array
signal_input: signal_input
}
)

expect_signal_with_start([42, 43], signal_input)
end

it 'raises when signal_input is given but signal_name is not' do
expect do
subject.start_workflow(
TestStartWorkflow,
[42, 54],
[43, 55],
options: { signal_input: 'what do you get if you multiply six by nine?', }
)
end.to raise_error(ArgumentError)
end
end

describe '#schedule_workflow' do
let(:temporal_response) do
Temporal::Api::WorkflowService::V1::StartWorkflowExecutionResponse.new(run_id: 'xxx')
Expand Down
Loading