Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def initialize(config)
# @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS
# @option options [Hash] :headers
# @option options [Hash] :search_attributes
# @option options [Integer] :start_delay determines the amount of seconds to wait before initiating a Workflow
#
# @return [String] workflow's run ID
def start_workflow(workflow, *input, options: {}, **args)
Expand All @@ -67,6 +68,7 @@ def start_workflow(workflow, *input, options: {}, **args)
headers: config.header_propagator_chain.inject(execution_options.headers),
memo: execution_options.memo,
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
start_delay: execution_options.start_delay
)
else
raise ArgumentError, 'If signal_input is provided, you must also provide signal_name' if signal_name.nil?
Expand All @@ -85,7 +87,8 @@ def start_workflow(workflow, *input, options: {}, **args)
memo: execution_options.memo,
search_attributes: Workflow::Context::Helpers.process_search_attributes(execution_options.search_attributes),
signal_name: signal_name,
signal_input: signal_input
signal_input: signal_input,
start_delay: execution_options.start_delay
)
end

Expand Down
8 changes: 6 additions & 2 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def start_workflow_execution(
headers: nil,
cron_schedule: nil,
memo: nil,
search_attributes: nil
search_attributes: nil,
start_delay: nil
)
request = Temporalio::Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
identity: identity,
Expand All @@ -137,6 +138,7 @@ def start_workflow_execution(
workflow_execution_timeout: execution_timeout,
workflow_run_timeout: run_timeout,
workflow_task_timeout: task_timeout,
workflow_start_delay: start_delay,
request_id: SecureRandom.uuid,
header: Temporalio::Api::Common::V1::Header.new(
fields: converter.to_payload_map(headers || {})
Expand Down Expand Up @@ -379,7 +381,8 @@ def signal_with_start_workflow_execution(
headers: nil,
cron_schedule: nil,
memo: nil,
search_attributes: nil
search_attributes: nil,
start_delay: nil
)
proto_header_fields = if headers.nil?
converter.to_payload_map({})
Expand All @@ -406,6 +409,7 @@ def signal_with_start_workflow_execution(
workflow_execution_timeout: execution_timeout,
workflow_run_timeout: run_timeout,
workflow_task_timeout: task_timeout,
workflow_start_delay: start_delay,
request_id: SecureRandom.uuid,
header: Temporalio::Api::Common::V1::Header.new(
fields: proto_header_fields
Expand Down
4 changes: 3 additions & 1 deletion lib/temporal/execution_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

module Temporal
class ExecutionOptions
attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes
attr_reader :name, :namespace, :task_queue, :retry_policy, :timeouts, :headers, :memo, :search_attributes,
:start_delay

def initialize(object, options, defaults = nil)
# Options are treated as overrides and take precedence
Expand All @@ -15,6 +16,7 @@ def initialize(object, options, defaults = nil)
@headers = options[:headers] || {}
@memo = options[:memo] || {}
@search_attributes = options[:search_attributes] || {}
@start_delay = options[:start_delay] || 0

# For Temporal::Workflow and Temporal::Activity use defined values as the next option
if has_executable_concern?(object)
Expand Down
37 changes: 23 additions & 14 deletions spec/unit/lib/temporal/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,21 @@ def inject!(header)
subject.start_workflow(TestStartWorkflow, 42)
expect(connection)
.to have_received(:start_workflow_execution)
.with(
namespace: 'default-test-namespace',
workflow_id: an_instance_of(String),
workflow_name: 'TestStartWorkflow',
task_queue: 'default-test-task-queue',
input: [42],
task_timeout: config.timeouts[:task],
run_timeout: config.timeouts[:run],
execution_timeout: config.timeouts[:execution],
workflow_id_reuse_policy: nil,
headers: { 'test' => 'asdf' },
memo: {},
search_attributes: {},
)
.with(
namespace: 'default-test-namespace',
workflow_id: an_instance_of(String),
workflow_name: 'TestStartWorkflow',
task_queue: 'default-test-task-queue',
input: [42],
task_timeout: config.timeouts[:task],
run_timeout: config.timeouts[:run],
execution_timeout: config.timeouts[:execution],
workflow_id_reuse_policy: nil,
headers: { 'test' => 'asdf' },
memo: {},
search_attributes: {},
start_delay: 0
)
end
end

Expand Down Expand Up @@ -94,6 +95,7 @@ def inject!(header)
headers: {},
memo: {},
search_attributes: {},
start_delay: 0
)
end

Expand All @@ -109,6 +111,7 @@ def inject!(header)
workflow_id_reuse_policy: :reject,
memo: { 'MemoKey1' => 'MemoValue1' },
search_attributes: { 'SearchAttribute1' => 256 },
start_delay: 10
}
)

Expand All @@ -127,6 +130,7 @@ def inject!(header)
headers: { 'Foo' => 'Bar' },
memo: { 'MemoKey1' => 'MemoValue1' },
search_attributes: { 'SearchAttribute1' => 256 },
start_delay: 10
)
end

Expand Down Expand Up @@ -154,6 +158,7 @@ def inject!(header)
headers: {},
memo: {},
search_attributes: {},
start_delay: 0
)
end

Expand All @@ -175,6 +180,7 @@ def inject!(header)
headers: {},
memo: {},
search_attributes: {},
start_delay: 0
)
end

Expand All @@ -198,6 +204,7 @@ def inject!(header)
headers: {},
memo: {},
search_attributes: {},
start_delay: 0
)
end
end
Expand Down Expand Up @@ -225,6 +232,7 @@ def inject!(header)
headers: {},
memo: {},
search_attributes: {},
start_delay: 0
)
end
end
Expand Down Expand Up @@ -255,6 +263,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument)
search_attributes: {},
signal_name: 'the question',
signal_input: expected_signal_argument,
start_delay: 0
)
end

Expand Down
10 changes: 6 additions & 4 deletions spec/unit/lib/temporal/execution_options_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
task_queue: 'test-task-queue',
retry_policy: { interval: 1, backoff: 2, max_attempts: 5 },
timeouts: { start_to_close: 10 },
headers: { 'TestHeader' => 'Test' }
headers: { 'TestHeader' => 'Test' },
start_delay: 10
}
end

it 'is initialized with full options' do
expect(subject.name).to eq(options[:name])
expect(subject.namespace).to eq(options[:namespace])
Expand All @@ -113,12 +114,13 @@ class TestExecutionOptionsWorkflow < Temporal::Workflow
expect(subject.retry_policy.max_attempts).to eq(options[:retry_policy][:max_attempts])
expect(subject.timeouts).to eq(options[:timeouts])
expect(subject.headers).to eq(options[:headers])
expect(subject.start_delay).to eq(options[:start_delay])
end
end

context 'when retry policy options are invalid' do
let(:options) { { retry_policy: { max_attempts: 10 } } }

it 'raises' do
expect { subject }.to raise_error(
Temporal::RetryPolicy::InvalidRetryPolicy,
Expand Down
4 changes: 4 additions & 0 deletions spec/unit/lib/temporal/grpc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
execution_timeout: 1,
run_timeout: 2,
task_timeout: 3,
start_delay: 10,
memo: {},
search_attributes: {
'foo-int-attribute' => 256,
Expand All @@ -90,6 +91,7 @@
expect(request.workflow_execution_timeout.seconds).to eq(1)
expect(request.workflow_run_timeout.seconds).to eq(2)
expect(request.workflow_task_timeout.seconds).to eq(3)
expect(request.workflow_start_delay.seconds).to eq(10)
expect(request.workflow_id_reuse_policy).to eq(:WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE)
expect(request.search_attributes.indexed_fields).to eq({
'foo-int-attribute' => Temporalio::Api::Common::V1::Payload.new(data: '256', metadata: { 'encoding' => 'json/plain' }),
Expand Down Expand Up @@ -138,6 +140,7 @@
execution_timeout: 1,
run_timeout: 2,
task_timeout: 3,
start_delay: 10,
workflow_id_reuse_policy: :allow,
signal_name: 'the question',
signal_input: 'what do you get if you multiply six by nine?'
Expand All @@ -153,6 +156,7 @@
expect(request.workflow_execution_timeout.seconds).to eq(1)
expect(request.workflow_run_timeout.seconds).to eq(2)
expect(request.workflow_task_timeout.seconds).to eq(3)
expect(request.workflow_start_delay.seconds).to eq(10)
expect(request.signal_name).to eq('the question')
expect(request.signal_input.payloads[0].data).to eq('"what do you get if you multiply six by nine?"')
expect(request.workflow_id_reuse_policy).to eq(:WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)
Expand Down