Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Ruby worker for Temporal
# Ruby SDK for Temporal

[![Coverage Status](https://coveralls.io/repos/github/coinbase/temporal-ruby/badge.svg?branch=master)](https://coveralls.io/github/coinbase/temporal-ruby?branch=master)

<img src="./assets/temporal_logo.png" width="250" align="right" alt="Temporal" />

A pure Ruby library for defining and running Temporal workflows and activities.

To find more about Temporal please visit <https://temporal.io/>.
To find more about Temporal itself please visit <https://temporal.io/>.


## Getting Started
Expand Down
116 changes: 109 additions & 7 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ def initialize(config)
@config = config
end

# Start a workflow
#
# @param workflow [Temporal::Workflow, String] workflow class or name. When a workflow class
# is passed, its config (namespace, task_queue, timeouts, etc) will be used
# @param input [any] arguments to be passed to workflow's #execute method
# @param args [Hash] keyword arguments to be passed to workflow's #execute method
# @param options [Hash, nil] optional overrides
# @option options [String] :workflow_id
# @option options [Symbol] :workflow_id_reuse_policy check Temporal::Connection::GRPC::WORKFLOW_ID_REUSE_POLICY
# @option options [String] :name workflow name
# @option options [String] :namespace
# @option options [String] :task_queue
# @option options [Hash] :retry_policy check Temporal::RetryPolicy for available options
# @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS
# @option options [Hash] :headers
#
# @return [String] workflow's run ID
def start_workflow(workflow, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?
Expand All @@ -37,6 +54,24 @@ def start_workflow(workflow, *input, **args)
response.run_id
end

# Schedule a workflow for a periodic cron-like execution
#
# @param workflow [Temporal::Workflow, String] workflow class or name. When a workflow class
# is passed, its config (namespace, task_queue, timeouts, etc) will be used
# @param cron_schedule [String] a cron-style schedule string
# @param input [any] arguments to be passed to workflow's #execute method
# @param args [Hash] keyword arguments to be pass to workflow's #execute method
# @param options [Hash, nil] optional overrides
# @option options [String] :workflow_id
# @option options [Symbol] :workflow_id_reuse_policy check Temporal::Connection::GRPC::WORKFLOW_ID_REUSE_POLICY
# @option options [String] :name workflow name
# @option options [String] :namespace
# @option options [String] :task_queue
# @option options [Hash] :retry_policy check Temporal::RetryPolicy for available options
# @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS
# @option options [Hash] :headers
#
# @return [String] workflow's run ID
def schedule_workflow(workflow, cron_schedule, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?
Expand Down Expand Up @@ -64,10 +99,23 @@ def schedule_workflow(workflow, cron_schedule, *input, **args)
response.run_id
end

# Register a new Temporal namespace
#
# @param name [String] name of the new namespace
# @param description [String] optional namespace description
def register_namespace(name, description = nil)
connection.register_namespace(name: name, description: description)
end

# Send a signal to a running workflow
#
# @param workflow [Temporal::Workflow, nil] workflow class or nil
# @param signal [String] name of the signal to send
# @param workflow_id [String]
# @param run_id [String]
# @param input [String, Array, nil] optional arguments for the signal
# @param namespace [String, nil] if nil, choose the one declared on the workflow class or the
# global default
def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespace: nil)
execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options)

Expand All @@ -80,15 +128,22 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespac
)
end

# Long polls for a workflow to be completed and returns whatever the execute function
# returned. This function times out after 30 seconds and throws Temporal::TimeoutError,
# Long polls for a workflow to be completed and returns workflow's return value.
#
# @note This function times out after 30 seconds and throws Temporal::TimeoutError,
# not to be confused with Temporal::WorkflowTimedOut which reports that the workflow
# itself timed out.
# run_id of nil: await the entire workflow completion. This can span multiple runs
# in the case where the workflow uses continue-as-new.
# timeout: seconds to wait for the result. This cannot be longer than 30 seconds because
# that is the maximum the server supports.
# namespace: if nil, choose the one declared on the Workflow, or the global default
#
# @param workflow [Temporal::Workflow, nil] workflow class or nil
# @param workflow_id [String]
# @param run_id [String, nil] awaits the entire workflow completion when nil. This can span
# multiple runs in the case where the workflow uses continue-as-new.
# @param timeout [Integer, nil] seconds to wait for the result. This cannot be longer than 30
# seconds because that is the maximum the server supports.
# @param namespace [String, nil] if nil, choose the one declared on the workflow class or the
# global default
#
# @return workflow's return value
def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, namespace: nil)
options = namespace ? {namespace: namespace} : {}
execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
Expand Down Expand Up @@ -135,6 +190,21 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam
end
end

# Reset a workflow
#
# @note More on resetting a workflow here —
# https://docs.temporal.io/docs/system-tools/tctl/#restart-reset-workflow
#
# @param namespace [String]
# @param workflow_id [String]
# @param run_id [String]
# @param strategy [Symbol, nil] one of the Temporal::ResetStrategy values or `nil` when
# passing a workflow_task_id
# @param workflow_task_id [Integer, nil] A specific event ID to reset to. The event has to
# be of a type WorkflowTaskCompleted, WorkflowTaskFailed or WorkflowTaskTimedOut
# @param reason [String] a reset reason to be recorded in workflow's history for reference
#
# @return [String] run_id of the new workflow execution
def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_id: nil, reason: 'manual reset')
# Pick default strategy for backwards-compatibility
strategy ||= :last_workflow_task unless workflow_task_id
Expand All @@ -157,6 +227,14 @@ def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_
response.run_id
end

# Terminate a running workflow
#
# @param workflow_id [String]
# @param namespace [String, nil] use a default namespace when `nil`
# @param run_id [String, nil]
# @param reason [String, nil] a termination reason to be recorded in workflow's history
# for reference
# @param details [String, Array, nil] optional details to be stored in history
def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil)
namespace ||= Temporal.configuration.namespace

Expand All @@ -169,6 +247,13 @@ def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, de
)
end

# Fetch workflow's execution info
#
# @param namespace [String]
# @param workflow_id [String]
# @param run_id [String]
#
# @return [Temporal::Workflow::ExecutionInfo] an object containing workflow status and other info
def fetch_workflow_execution_info(namespace, workflow_id, run_id)
response = connection.describe_workflow_execution(
namespace: namespace,
Expand All @@ -179,6 +264,11 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id)
Workflow::ExecutionInfo.generate_from(response.workflow_execution_info)
end

# Manually complete an activity
#
# @param async_token [String] an encoded Temporal::Activity::AsyncToken
# @param result [String, Array, nil] activity's return value to be stored in history and
# passed back to a workflow
def complete_activity(async_token, result = nil)
details = Activity::AsyncToken.decode(async_token)

Expand All @@ -191,6 +281,11 @@ def complete_activity(async_token, result = nil)
)
end

# Manually fail an activity
#
# @param async_token [String] an encoded Temporal::Activity::AsyncToken
# @param exception [Exception] activity's failure exception to be stored in history and
# raised in a workflow
def fail_activity(async_token, exception)
details = Activity::AsyncToken.decode(async_token)

Expand All @@ -203,6 +298,13 @@ def fail_activity(async_token, exception)
)
end

# Fetch workflow's execution history
#
# @param namespace [String]
# @param workflow_id [String]
# @param run_id [String]
#
# @return [Temporal::Workflow::History] workflow's execution history
def get_workflow_history(namespace:, workflow_id:, run_id:)
history_response = connection.get_workflow_execution_history(
namespace: namespace,
Expand Down
1 change: 1 addition & 0 deletions temporal.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'rspec'
spec.add_development_dependency 'fabrication'
spec.add_development_dependency 'grpc-tools'
spec.add_development_dependency 'yard'
end