diff --git a/README.md b/README.md index cc8b0441..4d59d255 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Ruby worker for Temporal +# Ruby SDK for Temporal [![Coverage Status](https://coveralls.io/repos/github/coinbase/temporal-ruby/badge.svg?branch=master)](https://coveralls.io/github/coinbase/temporal-ruby?branch=master) @@ -6,7 +6,7 @@ A pure Ruby library for defining and running Temporal workflows and activities. -To find more about Temporal please visit . +To find more about Temporal itself please visit . ## Getting Started diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index d2380855..b4d6d0b2 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -13,6 +13,23 @@ def initialize(config) @config = config end + # Start a workflow + # + # @param workflow [Temporal::Workflow, String] workflow class or name. When a workflow class + # is passed, its config (namespace, task_queue, timeouts, etc) will be used + # @param input [any] arguments to be passed to workflow's #execute method + # @param args [Hash] keyword arguments to be passed to workflow's #execute method + # @param options [Hash, nil] optional overrides + # @option options [String] :workflow_id + # @option options [Symbol] :workflow_id_reuse_policy check Temporal::Connection::GRPC::WORKFLOW_ID_REUSE_POLICY + # @option options [String] :name workflow name + # @option options [String] :namespace + # @option options [String] :task_queue + # @option options [Hash] :retry_policy check Temporal::RetryPolicy for available options + # @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS + # @option options [Hash] :headers + # + # @return [String] workflow's run ID def start_workflow(workflow, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? @@ -37,6 +54,24 @@ def start_workflow(workflow, *input, **args) response.run_id end + # Schedule a workflow for a periodic cron-like execution + # + # @param workflow [Temporal::Workflow, String] workflow class or name. When a workflow class + # is passed, its config (namespace, task_queue, timeouts, etc) will be used + # @param cron_schedule [String] a cron-style schedule string + # @param input [any] arguments to be passed to workflow's #execute method + # @param args [Hash] keyword arguments to be pass to workflow's #execute method + # @param options [Hash, nil] optional overrides + # @option options [String] :workflow_id + # @option options [Symbol] :workflow_id_reuse_policy check Temporal::Connection::GRPC::WORKFLOW_ID_REUSE_POLICY + # @option options [String] :name workflow name + # @option options [String] :namespace + # @option options [String] :task_queue + # @option options [Hash] :retry_policy check Temporal::RetryPolicy for available options + # @option options [Hash] :timeouts check Temporal::Configuration::DEFAULT_TIMEOUTS + # @option options [Hash] :headers + # + # @return [String] workflow's run ID def schedule_workflow(workflow, cron_schedule, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? @@ -64,10 +99,23 @@ def schedule_workflow(workflow, cron_schedule, *input, **args) response.run_id end + # Register a new Temporal namespace + # + # @param name [String] name of the new namespace + # @param description [String] optional namespace description def register_namespace(name, description = nil) connection.register_namespace(name: name, description: description) end + # Send a signal to a running workflow + # + # @param workflow [Temporal::Workflow, nil] workflow class or nil + # @param signal [String] name of the signal to send + # @param workflow_id [String] + # @param run_id [String] + # @param input [String, Array, nil] optional arguments for the signal + # @param namespace [String, nil] if nil, choose the one declared on the workflow class or the + # global default def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespace: nil) execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options) @@ -80,15 +128,22 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil, namespac ) end - # Long polls for a workflow to be completed and returns whatever the execute function - # returned. This function times out after 30 seconds and throws Temporal::TimeoutError, + # Long polls for a workflow to be completed and returns workflow's return value. + # + # @note This function times out after 30 seconds and throws Temporal::TimeoutError, # not to be confused with Temporal::WorkflowTimedOut which reports that the workflow # itself timed out. - # run_id of nil: await the entire workflow completion. This can span multiple runs - # in the case where the workflow uses continue-as-new. - # timeout: seconds to wait for the result. This cannot be longer than 30 seconds because - # that is the maximum the server supports. - # namespace: if nil, choose the one declared on the Workflow, or the global default + # + # @param workflow [Temporal::Workflow, nil] workflow class or nil + # @param workflow_id [String] + # @param run_id [String, nil] awaits the entire workflow completion when nil. This can span + # multiple runs in the case where the workflow uses continue-as-new. + # @param timeout [Integer, nil] seconds to wait for the result. This cannot be longer than 30 + # seconds because that is the maximum the server supports. + # @param namespace [String, nil] if nil, choose the one declared on the workflow class or the + # global default + # + # @return workflow's return value def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, namespace: nil) options = namespace ? {namespace: namespace} : {} execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) @@ -135,6 +190,21 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam end end + # Reset a workflow + # + # @note More on resetting a workflow here — + # https://docs.temporal.io/docs/system-tools/tctl/#restart-reset-workflow + # + # @param namespace [String] + # @param workflow_id [String] + # @param run_id [String] + # @param strategy [Symbol, nil] one of the Temporal::ResetStrategy values or `nil` when + # passing a workflow_task_id + # @param workflow_task_id [Integer, nil] A specific event ID to reset to. The event has to + # be of a type WorkflowTaskCompleted, WorkflowTaskFailed or WorkflowTaskTimedOut + # @param reason [String] a reset reason to be recorded in workflow's history for reference + # + # @return [String] run_id of the new workflow execution def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_id: nil, reason: 'manual reset') # Pick default strategy for backwards-compatibility strategy ||= :last_workflow_task unless workflow_task_id @@ -157,6 +227,14 @@ def reset_workflow(namespace, workflow_id, run_id, strategy: nil, workflow_task_ response.run_id end + # Terminate a running workflow + # + # @param workflow_id [String] + # @param namespace [String, nil] use a default namespace when `nil` + # @param run_id [String, nil] + # @param reason [String, nil] a termination reason to be recorded in workflow's history + # for reference + # @param details [String, Array, nil] optional details to be stored in history def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil) namespace ||= Temporal.configuration.namespace @@ -169,6 +247,13 @@ def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, de ) end + # Fetch workflow's execution info + # + # @param namespace [String] + # @param workflow_id [String] + # @param run_id [String] + # + # @return [Temporal::Workflow::ExecutionInfo] an object containing workflow status and other info def fetch_workflow_execution_info(namespace, workflow_id, run_id) response = connection.describe_workflow_execution( namespace: namespace, @@ -179,6 +264,11 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id) Workflow::ExecutionInfo.generate_from(response.workflow_execution_info) end + # Manually complete an activity + # + # @param async_token [String] an encoded Temporal::Activity::AsyncToken + # @param result [String, Array, nil] activity's return value to be stored in history and + # passed back to a workflow def complete_activity(async_token, result = nil) details = Activity::AsyncToken.decode(async_token) @@ -191,6 +281,11 @@ def complete_activity(async_token, result = nil) ) end + # Manually fail an activity + # + # @param async_token [String] an encoded Temporal::Activity::AsyncToken + # @param exception [Exception] activity's failure exception to be stored in history and + # raised in a workflow def fail_activity(async_token, exception) details = Activity::AsyncToken.decode(async_token) @@ -203,6 +298,13 @@ def fail_activity(async_token, exception) ) end + # Fetch workflow's execution history + # + # @param namespace [String] + # @param workflow_id [String] + # @param run_id [String] + # + # @return [Temporal::Workflow::History] workflow's execution history def get_workflow_history(namespace:, workflow_id:, run_id:) history_response = connection.get_workflow_execution_history( namespace: namespace, diff --git a/temporal.gemspec b/temporal.gemspec index 59ef1192..c8589c9e 100644 --- a/temporal.gemspec +++ b/temporal.gemspec @@ -21,4 +21,5 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rspec' spec.add_development_dependency 'fabrication' spec.add_development_dependency 'grpc-tools' + spec.add_development_dependency 'yard' end