Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/lib/cryptconverter.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require 'openssl'

module Temporal
class CryptConverter < Temporal::Client::Converter::Base
class CryptConverter < Temporal::Connection::Converter::Base
CIPHER = 'aes-256-gcm'.freeze
GCM_NONCE_SIZE = 12
GCM_TAG_SIZE = 16
Expand Down
4 changes: 2 additions & 2 deletions examples/spec/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def wait_for_workflow_completion(workflow_id, run_id)
end

def fetch_history(workflow_id, run_id, options = {})
client = Temporal.send(:client)
connection = Temporal.send(:connection)

result = client.get_workflow_execution_history(
result = connection.get_workflow_execution_history(
{
namespace: Temporal.configuration.namespace,
workflow_id: workflow_id,
Expand Down
51 changes: 28 additions & 23 deletions lib/temporal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
require 'securerandom'
require 'temporal/configuration'
require 'temporal/execution_options'
require 'temporal/client'
require 'temporal/connection'
require 'temporal/activity'
require 'temporal/activity/async_token'
require 'temporal/workflow'
Expand All @@ -21,10 +21,10 @@ def start_workflow(workflow, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?

execution_options = ExecutionOptions.new(workflow, options)
execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
workflow_id = options[:workflow_id] || SecureRandom.uuid

response = client.start_workflow_execution(
response = connection.start_workflow_execution(
namespace: execution_options.namespace,
workflow_id: workflow_id,
workflow_name: execution_options.name,
Expand All @@ -45,10 +45,10 @@ def schedule_workflow(workflow, cron_schedule, *input, **args)
options = args.delete(:options) || {}
input << args unless args.empty?

execution_options = ExecutionOptions.new(workflow, options)
execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
workflow_id = options[:workflow_id] || SecureRandom.uuid

response = client.start_workflow_execution(
response = connection.start_workflow_execution(
namespace: execution_options.namespace,
workflow_id: workflow_id,
workflow_name: execution_options.name,
Expand All @@ -69,13 +69,13 @@ def schedule_workflow(workflow, cron_schedule, *input, **args)
end

def register_namespace(name, description = nil)
client.register_namespace(name: name, description: description)
connection.register_namespace(name: name, description: description)
end

def signal_workflow(workflow, signal, workflow_id, run_id, input = nil)
execution_options = ExecutionOptions.new(workflow)
execution_options = ExecutionOptions.new(workflow, {}, config.default_execution_options)

client.signal_workflow_execution(
connection.signal_workflow_execution(
namespace: execution_options.namespace, # TODO: allow passing namespace instead
workflow_id: workflow_id,
run_id: run_id,
Expand All @@ -95,11 +95,11 @@ def signal_workflow(workflow, signal, workflow_id, run_id, input = nil)
# namespace: if nil, choose the one declared on the Workflow, or the global default
def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, namespace: nil)
options = namespace ? {namespace: namespace} : {}
execution_options = ExecutionOptions.new(workflow, options)
max_timeout = Temporal::Client::GRPCClient::SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL
execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options)
max_timeout = Temporal::Connection::GRPC::SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL
history_response = nil
begin
history_response = client.get_workflow_execution_history(
history_response = connection.get_workflow_execution_history(
namespace: execution_options.namespace,
workflow_id: workflow_id,
run_id: run_id,
Expand Down Expand Up @@ -143,7 +143,7 @@ def reset_workflow(namespace, workflow_id, run_id, workflow_task_id: nil, reason
workflow_task_id ||= get_last_completed_workflow_task_id(namespace, workflow_id, run_id)
raise Error, 'Could not find a completed workflow task event' unless workflow_task_id

response = client.reset_workflow_execution(
response = connection.reset_workflow_execution(
namespace: namespace,
workflow_id: workflow_id,
run_id: run_id,
Expand All @@ -157,7 +157,7 @@ def reset_workflow(namespace, workflow_id, run_id, workflow_task_id: nil, reason
def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, details: nil)
namespace ||= Temporal.configuration.namespace

client.terminate_workflow_execution(
connection.terminate_workflow_execution(
namespace: namespace,
workflow_id: workflow_id,
run_id: run_id,
Expand All @@ -167,7 +167,7 @@ def terminate_workflow(workflow_id, namespace: nil, run_id: nil, reason: nil, de
end

def fetch_workflow_execution_info(namespace, workflow_id, run_id)
response = client.describe_workflow_execution(
response = connection.describe_workflow_execution(
namespace: namespace,
workflow_id: workflow_id,
run_id: run_id
Expand All @@ -179,7 +179,7 @@ def fetch_workflow_execution_info(namespace, workflow_id, run_id)
def complete_activity(async_token, result = nil)
details = Activity::AsyncToken.decode(async_token)

client.respond_activity_task_completed_by_id(
connection.respond_activity_task_completed_by_id(
namespace: details.namespace,
activity_id: details.activity_id,
workflow_id: details.workflow_id,
Expand All @@ -191,7 +191,7 @@ def complete_activity(async_token, result = nil)
def fail_activity(async_token, exception)
details = Activity::AsyncToken.decode(async_token)

client.respond_activity_task_failed_by_id(
connection.respond_activity_task_failed_by_id(
namespace: details.namespace,
activity_id: details.activity_id,
workflow_id: details.workflow_id,
Expand All @@ -201,19 +201,20 @@ def fail_activity(async_token, exception)
end

def configure(&block)
yield configuration
yield config
end

def configuration
@configuration ||= Configuration.new
warn '[DEPRECATION] This method is now deprecated without a substitution'
config
end

def logger
configuration.logger
config.logger
end

def metrics
@metrics ||= Metrics.new(configuration.metrics_adapter)
@metrics ||= Metrics.new(config.metrics_adapter)
end

class ResultConverter
Expand All @@ -222,13 +223,17 @@ class ResultConverter
private_constant :ResultConverter

private

def config
@config ||= Configuration.new
end

def client
@client ||= Temporal::Client.generate
def connection
@connection ||= Temporal::Connection.generate(config.for_connection)
end

def get_last_completed_workflow_task_id(namespace, workflow_id, run_id)
history_response = client.get_workflow_execution_history(
history_response = connection.get_workflow_execution_history(
namespace: namespace,
workflow_id: workflow_id,
run_id: run_id
Expand Down
8 changes: 4 additions & 4 deletions lib/temporal/activity/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
module Temporal
class Activity
class Context
def initialize(client, metadata)
@client = client
def initialize(connection, metadata)
@connection = connection
@metadata = metadata
@async = false
end
Expand All @@ -32,7 +32,7 @@ def async_token

def heartbeat(details = nil)
logger.debug("Activity heartbeat", metadata.to_h)
client.record_activity_task_heartbeat(task_token: task_token, details: details)
connection.record_activity_task_heartbeat(task_token: task_token, details: details)
end

def heartbeat_details
Expand All @@ -58,7 +58,7 @@ def headers

private

attr_reader :client, :metadata
attr_reader :connection, :metadata

def task_token
metadata.task_token
Expand Down
17 changes: 9 additions & 8 deletions lib/temporal/activity/poller.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require 'temporal/client'
require 'temporal/connection'
require 'temporal/thread_pool'
require 'temporal/middleware/chain'
require 'temporal/activity/task_processor'
Expand All @@ -11,10 +11,11 @@ class Poller
thread_pool_size: 20
}.freeze

def initialize(namespace, task_queue, activity_lookup, middleware = [], options = {})
def initialize(namespace, task_queue, activity_lookup, config, middleware = [], options = {})
@namespace = namespace
@task_queue = task_queue
@activity_lookup = activity_lookup
@config = config
@middleware = middleware
@shutting_down = false
@options = DEFAULT_OPTIONS.merge(options)
Expand All @@ -31,7 +32,7 @@ def stop_polling
end

def cancel_pending_requests
client.cancel_polling_request
connection.cancel_polling_request
end

def wait
Expand All @@ -41,10 +42,10 @@ def wait

private

attr_reader :namespace, :task_queue, :activity_lookup, :middleware, :options, :thread
attr_reader :namespace, :task_queue, :activity_lookup, :config, :middleware, :options, :thread

def client
@client ||= Temporal::Client.generate
def connection
@connection ||= Temporal::Connection.generate(config.for_connection)
end

def shutting_down?
Expand Down Expand Up @@ -73,7 +74,7 @@ def poll_loop
end

def poll_for_task
client.poll_activity_task_queue(namespace: namespace, task_queue: task_queue)
connection.poll_activity_task_queue(namespace: namespace, task_queue: task_queue)
rescue StandardError => error
Temporal.logger.error("Unable to poll activity task queue", { namespace: namespace, task_queue: task_queue, error: error.inspect })

Expand All @@ -85,7 +86,7 @@ def poll_for_task
def process(task)
middleware_chain = Middleware::Chain.new(middleware)

TaskProcessor.new(task, namespace, activity_lookup, client, middleware_chain).process
TaskProcessor.new(task, namespace, activity_lookup, middleware_chain, config).process
end

def thread_pool
Expand Down
24 changes: 15 additions & 9 deletions lib/temporal/activity/task_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,23 @@
require 'temporal/errors'
require 'temporal/activity/context'
require 'temporal/concerns/payloads'
require 'temporal/client/retryer'
require 'temporal/connection/retryer'
require 'temporal/connection'

module Temporal
class Activity
class TaskProcessor
include Concerns::Payloads

def initialize(task, namespace, activity_lookup, client, middleware_chain)
def initialize(task, namespace, activity_lookup, middleware_chain, config)
@task = task
@namespace = namespace
@metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, namespace)
@task_token = task.task_token
@activity_name = task.activity_type.name
@activity_class = activity_lookup.find(activity_name)
@client = client
@middleware_chain = middleware_chain
@config = config
end

def process
Expand All @@ -27,7 +28,7 @@ def process
Temporal.logger.debug("Processing Activity task", metadata.to_h)
Temporal.metrics.timing('activity_task.queue_time', queue_time_ms, activity: activity_name)

context = Activity::Context.new(client, metadata)
context = Activity::Context.new(connection, metadata)

if !activity_class
raise ActivityNotRegistered, 'Activity is not registered with this worker'
Expand All @@ -51,7 +52,12 @@ def process

private

attr_reader :task, :namespace, :task_token, :activity_name, :activity_class, :client, :middleware_chain, :metadata
attr_reader :task, :namespace, :task_token, :activity_name, :activity_class,
:middleware_chain, :metadata, :config

def connection
@connection ||= Temporal::Connection.generate(config.for_connection)
end

def queue_time_ms
scheduled = task.current_attempt_scheduled_time.to_f
Expand All @@ -64,8 +70,8 @@ def respond_completed(result)
log_retry = proc do
Temporal.logger.debug("Failed to report activity task completion, retrying", metadata.to_h)
end
Temporal::Client::Retryer.with_retries(on_retry: log_retry) do
client.respond_activity_task_completed(task_token: task_token, result: result)
Temporal::Connection::Retryer.with_retries(on_retry: log_retry) do
connection.respond_activity_task_completed(task_token: task_token, result: result)
end
rescue StandardError => error
Temporal.logger.error("Unable to complete Activity", metadata.to_h.merge(error: error.inspect))
Expand All @@ -78,8 +84,8 @@ def respond_failed(error)
log_retry = proc do
Temporal.logger.debug("Failed to report activity task failure, retrying", metadata.to_h)
end
Temporal::Client::Retryer.with_retries(on_retry: log_retry) do
client.respond_activity_task_failed(task_token: task_token, exception: error)
Temporal::Connection::Retryer.with_retries(on_retry: log_retry) do
connection.respond_activity_task_failed(task_token: task_token, exception: error)
end
rescue StandardError => error
Temporal.logger.error("Unable to fail Activity task", metadata.to_h.merge(error: error.inspect))
Expand Down
21 changes: 0 additions & 21 deletions lib/temporal/client.rb

This file was deleted.

Loading